Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.
spark 테스트 프로그램 몇개
package com.gooper.test;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
import com.google.gson.Gson;
public final class JavaSparkPi {
static final SparkConf sparkConf=new SparkConf().setAppName("JavaSparkPi");
static {
sparkConf.setMaster("spark://gsda1:7077,gsda2:7077");
sparkConf.setMaster("local[*]");
}
static final JavaSparkContext jsc=new JavaSparkContext(sparkConf);
static final Broadcast<List<String>> temp = jsc.broadcast(Arrays.asList("hello world", "", "hi park", "", "sss ff"));
public static void main(String[] args) throws Exception {
System.out.println("start................");
// PI값 구하기
int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++) {
l.add(i);
}
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
int count = dataSet.map((z) -> {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
if(x * x + y * y <= 1) return 1; else return 0;
}).reduce((s, s2) -> (s + s2));
System.out.println("Pi is roughly " + 4.0 * count / n);
System.out.println("=========== test start =================================");
test(jsc);
System.out.println("=========== test end =================================");
System.out.println("=========== test2 start =================================");
test2(jsc);
System.out.println("=========== test2 end =================================");
System.out.println("=========== test3 start =================================");
test3(jsc);
System.out.println("=========== test3 end =================================");
System.out.println("=========== test4 start =================================");
test4(jsc);
System.out.println("=========== test4 end =================================");
System.out.println("=========== test5 start =================================");
test5(jsc);
System.out.println("=========== test5 end =================================");
System.out.println("=========== test6 start =================================");
test6(jsc);
System.out.println("=========== test6 end =================================");
System.out.println("=========== test7 start =================================");
test7(jsc);
System.out.println("=========== test7 end =================================");
System.out.println("=========== test8 start =================================");
test8(jsc);
System.out.println("=========== test8 end =================================");
System.out.println("=========== test9 start =================================");
test9(jsc);
System.out.println("=========== test9 end =================================");
System.out.println("=========== test10 start =================================");
test10(jsc);
System.out.println("=========== test10 end =================================");
System.out.println("=========== test11 start =================================");
test11(jsc);
System.out.println("=========== test11 end =================================");
System.out.println("=========== test12 start =================================");
test12(jsc);
System.out.println("=========== test12 end =================================");
System.out.println("=========== test13 start =================================");
test13(jsc);
System.out.println("=========== test13 end =================================");
System.out.println("=========== test14 start =================================");
test14(jsc);
System.out.println("=========== test14 end =================================");
System.out.println("=========== test15 start =================================");
test15();
System.out.println("=========== test15 end =================================");
System.out.println("=========== test16 start =================================");
test16(jsc);
System.out.println("=========== test16 end =================================");
System.out.println("=========== test17 start =================================");
test17(jsc);
System.out.println("=========== test17 end =================================");
System.out.println("=========== test18 start =================================");
test18(jsc);
System.out.println("=========== test18 end =================================");
System.out.println("=========== test19 start =================================");
test19(jsc);
System.out.println("=========== test19 end =================================");
jsc.stop();
jsc.close();
System.out.println("end................");
}
// List를 RDD로 변환하고 map연상을 통해서 x*x한 값으로 구성된 JavaRDD를 만들어서 화면에 출력
static void test (JavaSparkContext sc) {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));
JavaRDD<Integer> result = rdd.map((x) -> {return x*x;});
System.out.println("result ==>"+StringUtils.join(result.collect(), ","));
}
// 문자열을 space로 분리(람다식을 이용함)하여 화면에 출력
static void test2 (JavaSparkContext sc) {
JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello world", "hi"));
JavaRDD<String> words = lines.flatMap( (line) -> {return Arrays.asList(line.split(" ")).iterator();});
System.out.println("result ==>"+StringUtils.join(words.collect(), ","));
}
// RDD의 함수인 disticnt, union, intersection, subtract, cartesian, countByValue를 적용하여 화면에 결과 출력
static void test3 (JavaSparkContext sc) {
JavaRDD<String> data1 = sc.parallelize(Arrays.asList("coffee", "coffee", "panda", "monkey", "tea"));
JavaRDD<String> data2 = sc.parallelize(Arrays.asList("coffee", "monkey", "kitty"));
JavaRDD<Integer> data3 = sc.parallelize(Arrays.asList(1,2,3));
System.out.println("distinct ==>"+data1.distinct().collect());
System.out.println("union ==>"+data1.union(data2).collect());
System.out.println("intersection ==>"+data1.intersection(data2).collect());
System.out.println("subtract ==>"+data1.subtract(data2).collect());
System.out.println("cartesion ==>"+data1.cartesian(data3).collect());
System.out.println("countByValue ==>"+data1.countByValue());
}
// persist를 사용하여 reduce, fold등를 적용해보고, double형의 RDD로 변환하여 mean값을 구하여 출력함
static void test4(JavaSparkContext sc) {
JavaRDD<Integer> data1 = sc.parallelize(Arrays.asList(1,2,3,4));
JavaRDD<Integer> data2 = sc.parallelize(Arrays.asList(3,4,5));
List<Integer> data3 = new ArrayList<Integer>();
data3.add(1);
data3.add(2);
data3.add(3);
data3.add(4);
JavaRDD<Integer> map = data1.map(x -> x+1);
map.persist(StorageLevel.MEMORY_AND_DISK());
Function2<Integer, Integer, Integer> reduce = new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) {
return x+y;
}
};
DoubleFunction<Integer> df = new DoubleFunction<Integer>() {
public double call(Integer x) {
return (double) x;
}
};
System.out.println("map==>"+data1.map(x -> x+1).reduce((x, y)->{return x+y;}));
System.out.println("fold==>"+map.fold(0, reduce));
map.foreach((x)->System.out.println(x));
JavaDoubleRDD result = data1.mapToDouble((x) -> x);
System.out.println("mean ===>"+result.mean());
result.foreach((x) -> System.out.println(x));
System.out.println("--------------------------");
JavaDoubleRDD result2 = map.mapToDouble(df);
System.out.println("mean by DoubleFuntion()===>"+result2.mean());
result2.foreach((x) -> System.out.println(x));
}
// 숫자형의 JavaRDD를 이용하여 aggregate함수를 사용하는 예제
static void test5(JavaSparkContext sc) {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));
Function2<AvgCount, Integer, AvgCount> addAndCount =
new Function2<AvgCount, Integer, AvgCount> (){
private static final long serialVersionUID = 122222L;
public AvgCount call(AvgCount a, Integer x) {
a.total += x;
a.num += 1;
return a;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine =
new Function2<AvgCount, AvgCount, AvgCount> (){
private static final long serialVersionUID = 11111L;
public AvgCount call(AvgCount a, AvgCount b) {
a.total += b.total;
a.num += b.num;
return a;
}
};
AvgCount initial = new AvgCount(0, 0);
AvgCount result = rdd.aggregate(initial, addAndCount, combine);
System.out.println(result.avg());
}
// 문자열 RDD를 key/value형태로 바꾸고 key를 기준을 작동하는 sortByKey, reduceByKey, groupByKey, sortByKey를 적용해보는 예제
static void test6(JavaSparkContext sc) {
JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello world", "hi", "hi park", "dk"));
PairFunction<String, String, Integer> keyData =
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String x) {
return new Tuple2(x.split(" ")[0], x.length());
}
};
JavaPairRDD<String, Integer> pairs = lines.mapToPair(keyData);
pairs.foreach(x->System.out.println(x));
JavaPairRDD<String, Integer> reduceByKey = pairs.reduceByKey( (x, y) -> { return (x+y);} );
JavaPairRDD<String, Iterable<Integer>> groupByKey = pairs.groupByKey();
JavaPairRDD<String, Integer> sortByKey = pairs.sortByKey(false);
System.out.println("reduceByKey =>"+reduceByKey.collect() );
System.out.println("groupByKey =>"+groupByKey.collect() );
System.out.println("sortBykey => "+sortByKey.collect() );
}
// Tuple2형의 RDD를 이용하여 mapToPair를 적용하여 JavaPairRDD를 만들면서 각각의 RDD를 출력해보는 예제
// 두개의 JavaPairRDD를 이용하여 subtractByKey, subtract, rightOuterJoin, leftOuterJoin, cogroup를 적용해보는 예제
static void test7(JavaSparkContext sc) {
List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
List<Tuple2<String, Integer>> data2 = new ArrayList<Tuple2<String, Integer>>();
data1.add(new Tuple2("a",2));
data1.add(new Tuple2("c",4));
data1.add(new Tuple2("c",6));
data2.add(new Tuple2("c",4));
JavaRDD<Tuple2<String, Integer>> pdataa1 = sc.parallelize(data1);
JavaPairRDD<String, Integer> pdataa11 = pdataa1.mapToPair( (x) -> {return new Tuple2(x._1, x._2);} );
System.out.println("pdataa1 ==>"+pdataa1);
System.out.println("pdataa11 ==>"+pdataa11);
System.out.println("pdataa1 ==>"+pdataa1.collect());
System.out.println("pdataa11 ==>"+pdataa11.collect());
JavaPairRDD<String, Integer> pdata1 = sc.parallelizePairs(data1);
JavaPairRDD<String, Integer> pdata2 = sc.parallelizePairs(data2);
System.out.println("pdata1 ==>"+pdata1.collect());
System.out.println("pdata2 ==>"+pdata2.collect());
System.out.println("subtractByKey =>"+pdataa11.subtractByKey(pdata2).collect());
System.out.println("subtract =>"+pdata1.subtract(pdata2).collect());
System.out.println("join =>"+pdata1.join(pdata2).collect());
System.out.println("rightOuterJoin =>"+pdata1.rightOuterJoin(pdata2).collect());
System.out.println("leftOuterJoin =>"+pdata1.leftOuterJoin(pdata2).collect());
System.out.println("cogroup =>"+pdata1.cogroup(pdata2).collect());
Function<Integer, Integer> ff = new Function<Integer, Integer>() {
private static final long serialVersionUID = 11234L;
int sum = 100;
public Integer call (Integer x) {
sum += x;
return sum;
}
};
System.out.println("mapValues =>" + pdata1.mapValues(ff).collect());
//System.out.println("mapValues 1=>" + pdata1.reduce( (x2._2) -> {return x2._2;}) );
System.out.println("reduce =>" + pdata1.reduce( (x2, y2) -> {return new Tuple2("sum of all elements", (x2._2+ y2._2) );}) );
}
// String형의 List를 JavaRDD를 만들고 이를 JavaPairRDD로 변환후 reduceByKey를 적용하는 예제
static void test8(JavaSparkContext sc) {
List<String> data1 = new ArrayList<String>();
data1.add("ab");
data1.add("abcd");
data1.add("ab");
data1.add("cd");
JavaRDD<String> pdata = sc.parallelize(data1);
JavaPairRDD<String, Integer> pdata1 = pdata.mapToPair(x-> {return new Tuple2(x, 1);});
System.out.println("mapToPair==>"+pdata1.collect());
JavaPairRDD<String, Integer> pdata2 = pdata1.reduceByKey( (x, y)-> { return (x+y); } );
System.out.println("reduceByKey==>"+pdata2.collect() );
}
// sc.parallelizePairs를 이용하여 JavaPairRDD를 만들고 combineByKey 적용하고 Map으로 collect후에 출력하는 예제
static void test9(JavaSparkContext sc) {
List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
data1.add(new Tuple2("a", 1));
data1.add(new Tuple2("b", 1));
data1.add(new Tuple2("a", 10));
data1.add(new Tuple2("c", 9));
JavaPairRDD<String, Integer> pdata = sc.parallelizePairs(data1);
Function<Integer, AvgCount2> createCombiner = new Function<Integer, AvgCount2>() {
public AvgCount2 call(Integer x) {
return new AvgCount2(x, 1);
}
};
Function2<AvgCount2, Integer, AvgCount2> mergeValue =
new Function2<AvgCount2, Integer, AvgCount2>() {
public AvgCount2 call(AvgCount2 a, Integer x) {
a.total_ += x;
a.num_ += 1;
return a;
}
};
Function2<AvgCount2, AvgCount2, AvgCount2> mergeCombiner =
new Function2<AvgCount2, AvgCount2, AvgCount2>() {
public AvgCount2 call(AvgCount2 a, AvgCount2 b) {
a.total_ += b.total_;
a.num_ += b.num_;
return a;
}
};
JavaPairRDD<String, AvgCount2> avgCounts = pdata.combineByKey(createCombiner, mergeValue, mergeCombiner);
Map<String, AvgCount2> countMap = avgCounts.collectAsMap();
for(Entry<String, AvgCount2> entry : countMap.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue().avg());
}
}
// 두개의 Tuple2형 List를 이용하여 sc.paralledizePairs를 이용하여 JavaPairRDD로 변환하고 leftOuterJoin를 적용하는 예제
static void test10(JavaSparkContext sc) {
List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
data1.add(new Tuple2("x", 10));
data1.add(new Tuple2("a", 1));
data1.add(new Tuple2("b", 1));
List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
data2.add(new Tuple2("a", "aa"));
data2.add(new Tuple2("b", "bb"));
JavaPairRDD<String, Integer> pdata1 = sc.parallelizePairs(data1);
JavaPairRDD<String, String> pdata2 = sc.parallelizePairs(data2);
pdata1.sortByKey(true);
JavaPairRDD<String, Tuple2<Integer,Optional<String>>> result = pdata1.leftOuterJoin(pdata2);
System.out.println("pdata1==>"+pdata1.collect());
System.out.println("pdata2==>"+pdata2.collect());
System.out.println("result==>"+result.collect());
}
// test.json파일을 일거서 메모리에 적재하고 데이타를 파싱하여 Person1 객체에 담아 partition별로 map작업을 수행고 HDFS에 저장하는 예제
static void test11(JavaSparkContext sc) {
String dir = "formatted-out";
JavaRDD<String> input = sc.textFile("file:///tmp/test.json", 5).persist(StorageLevel.MEMORY_ONLY_2());
JavaRDD<Person1> result = input.mapPartitions(new ParseJson());
System.out.println("persons from json ===>"+result.collect());
JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
delete_dir(dir);
//mkdir_dir();
formatted.saveAsTextFile(dir);
}
// Tuple2형의 List를 key/value형의 JavaPairRDD로 변환하고 ConvertToWritableTypes 객체를 이용하여 IntWritable형으로 변환하여
// SequenceFileOutputFormat으로 저장하는 예제
static void test12(JavaSparkContext sc) {
String dir = "sequence-write";
List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
data1.add(new Tuple2("a",2));
data1.add(new Tuple2("c",4));
data1.add(new Tuple2("c",6));
JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(data1);
JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes());
delete_dir(dir);
System.out.println("Native Values before ==>"+data1.toString());
result.saveAsNewAPIHadoopFile(dir, Text.class, IntWritable.class, SequenceFileOutputFormat.class);
System.out.println("Saved as SequenceFileOutputFormat.class");
}
// Writable형의 저장되어 있는 sequence format의 data를 읽어서 원래의 값으로 변환하는 예제
static void test13(JavaSparkContext sc) {
String fileName = "sequence-write";
JavaPairRDD<Text, IntWritable> input = sc.sequenceFile(fileName, Text.class, IntWritable.class);
JavaPairRDD<String, Integer> result = input.mapToPair(new ConvertToNativeTypes());
System.out.println("Native Values after ====>"+result.collect());
}
// HIveContext를 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0기준으로 deprecated됨)
static void test14(JavaSparkContext sc) {
HiveContext ctx = new HiveContext(sc);
Dataset<Row> rows = ctx.sql("select * from default.test_table");
Dataset<String> stringsDS = rows.map(new MapFunction<Row, String>() {
@Override
public String call(Row row) throws Exception {
//return "Key : "+row.get(0) + ", Value : "+row.get(1);
return "Value : "+row.get(0);
}
}, Encoders.STRING());
System.out.println("select result #1 =>"+stringsDS.toJavaRDD().collect());
}
// SparkSession을 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0이상에서는 이것만 유효함)
static void test15() {
SparkSession session = new SparkSession.Builder().appName("SparkJoinExample").master("local").enableHiveSupport().getOrCreate();
Dataset<Row> dset = session.sql("select * from default.test_table");
System.out.println("select result #2 =>"+dset.toJavaRDD().collect());
}
// SQLContext를 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0기준으로 deprecated됨)
static void test16(JavaSparkContext sc) {
SQLContext ctx = new SQLContext(sc);
Dataset<Row> rows = ctx.sql("select * from default.test_table");
Dataset<String> stringsDS = rows.map(new MapFunction<Row, String>() {
@Override
public String call(Row row) throws Exception {
//return "Key : "+row.get(0) + ", Value : "+row.get(1);
return "Value : "+row.get(0);
}
}, Encoders.STRING());
System.out.println("select result #3 =>"+stringsDS.toJavaRDD().collect());
}
// accumulator 변수를 사용하는 예제
static void test17(JavaSparkContext sc) {
final Accumulator<Integer> blankLines = sc.accumulator(0);
JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello world", "", "hi park", "", "sss ff"));
JavaRDD<String> callSigns = lines.flatMap(
new FlatMapFunction<String, String>() {
public Iterator<String> call(String line) {
if(line.equals("")) {
blankLines.add(1);
}
return Arrays.asList(line.split(" ")).iterator();
}
});
delete_dir("output.txt");
callSigns.saveAsTextFile("output.txt");
System.out.println("Blank lines: "+blankLines.value());
}
// accumulator와 broadcast변수를 동시에 사용하는 예제
static void test18(JavaSparkContext sc) {
final Accumulator<Integer> blankLines = sc.accumulator(0);
JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello world", "", "hi park", "", "sss ff"));
JavaRDD<String> callSigns = lines.flatMap(
new FlatMapFunction<String, String>() {
public Iterator<String> call(String line) {
if(line.equals("")) {
blankLines.add(1);
}
System.out.println("str in broadcasted ==>"+temp.value());
return Arrays.asList(line.split(" ")).iterator();
}
});
delete_dir("output.txt2");
callSigns.saveAsTextFile("output.txt2");
System.out.println("Blank lines: "+blankLines.value());
}
// 문자형 List를 JavaRDD로 변환하고 mapToDobule를 이용하여 double형으로 변환하고 집계함수인 sum, mean, variance, stdev등을 적용해보는 예제
static void test19(JavaSparkContext sc) {
JavaRDD<String> age = sc.parallelize(Arrays.asList("1","2","3","4","5"));
JavaDoubleRDD doubleAge = age.mapToDouble(new DoubleFunction<String>() {
public double call(String value) {
return Double.parseDouble(value);
}});
System.out.println("sum = "+doubleAge.sum());
System.out.println("mean ="+doubleAge.mean());
System.out.println("variance ="+doubleAge.variance());
System.out.println("stdev ="+doubleAge.stdev());
}
// Tuple2<Text, IntWritable>값을 받아서 String형 key와 Integer형 value로 변환하는 클래스
static class ConvertToNativeTypes implements PairFunction<Tuple2<Text, IntWritable>, String, Integer> {
public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) {
return new Tuple2(record._1.toString(), record._2.get());
}
}
// String형의 key와 Integer형의 Tuple2를 받아서 Text형의 key와 IntWritable형의 Tuple2값으로 변환하는 클래스
static class ConvertToWritableTypes implements PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
return new Tuple2(new Text(record._1), new IntWritable(record._2));
}
}
// HDFS상의 폴더밑 하위 파일을 지우는 함수
static void delete_dir(String ff) {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://mycluster");
try {
FileSystem dfs = FileSystem.get(conf);
System.out.println("Home Path : " + dfs.getHomeDirectory());
System.out.println("Work Path : " + dfs.getWorkingDirectory());
Path dir = new Path (ff);
if(dfs.exists(dir)) {
dfs.delete(dir, true);
}
} catch (Exception e) {
System.out.println("delete dir error ==>"+e);
}
}
// HDFS상의 폴더를 생성하는 함수
static void mkdir_dir() {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://mycluster");
try {
FileSystem dfs = FileSystem.get(conf);
Path dir = new Path ("formatted-out");
if( ! dfs.exists(dir)) {
dfs.mkdirs(dir);
}
} catch (Exception e) {
System.out.println("mk dir error ==>"+e);
}
}
}
// json데이타를 Person1형의 iterator로 변환하는 클래스
class ParseJson implements FlatMapFunction<Iterator<String>, Person1> {
public Iterator<Person1> call(Iterator<String> lines) throws Exception {
ArrayList<Person1> people = new ArrayList<Person1>();
Gson mapper = new Gson();
while (lines.hasNext()) {
String line = lines.next();
try {
System.out.println("line => "+line);
Person1 person1 = mapper.fromJson(line, Person1.class);
System.out.println("person1=>"+person1);
people.add(person1);
} catch (Exception e) {
// 무시함
}
}
return people.iterator();
}
}
// Person1형의 데이타를 String형의 iterator로 변환하는 클래스
class WriteJson implements FlatMapFunction<Iterator<Person1>, String> {
public Iterator<String> call(Iterator<Person1> people) throws Exception {
ArrayList<String> text = new ArrayList<String>();
while(people.hasNext()) {
Person1 person = people.next();
text.add("new string =>"+person.toString());
}
return text.iterator();
}
}
class Person1 implements Serializable {
String name;
int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "Person1 [name=" + name + ", age=" + age + "]";
}
}
// 평균값을 구하는 클래스 1
class AvgCount implements Serializable {
private static final long serialVersionUID = 134444L;
public AvgCount (int total, int num) {
this.total = total;
this.num = num;
}
public int total;
public int num;
public double avg() {
return total / (double) num;
}
}
//평균값을 구하는 클래스 2
class AvgCount2 implements Serializable {
private static final long serialVersionUID = -1683922668212126392L;
public AvgCount2(int total, int num) { total_= total; num_=num;}
public int total_;
public int num_;
public float avg() {
return total_ / (float) num_;
}
}