메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


spark 테스트 프로그램 몇개

package com.gooper.test;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;

import scala.Tuple2;

import com.google.gson.Gson;

public final class JavaSparkPi {
	
	static final SparkConf sparkConf=new SparkConf().setAppName("JavaSparkPi");
	static {
		sparkConf.setMaster("spark://gsda1:7077,gsda2:7077");
		sparkConf.setMaster("local[*]");

	}
    
	static final JavaSparkContext jsc=new JavaSparkContext(sparkConf);
	static  final Broadcast<List<String>> temp = jsc.broadcast(Arrays.asList("hello world", "", "hi park", "", "sss ff"));

  public static void main(String[] args) throws Exception {
	  
	System.out.println("start................");

	// PI값 구하기
    int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
    int n = 100000 * slices;
    List<Integer> l = new ArrayList<Integer>(n);
    for (int i = 0; i < n; i++) {
      l.add(i);
    }

    JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
    
    int count = dataSet.map((z) ->  {
        double x = Math.random() * 2 - 1;
        double y = Math.random() * 2 - 1;
        if(x * x + y * y <= 1)  return 1; else return 0;
    }).reduce((s, s2) -> (s + s2));

    System.out.println("Pi is roughly " + 4.0 * count / n);

    
    System.out.println("=========== test start =================================");
    test(jsc);
    System.out.println("=========== test end =================================");
    
    System.out.println("=========== test2 start =================================");
    test2(jsc);
    System.out.println("=========== test2 end =================================");
    
    System.out.println("=========== test3 start =================================");
    test3(jsc);
    System.out.println("=========== test3 end =================================");

    System.out.println("=========== test4 start =================================");
    test4(jsc);
    System.out.println("=========== test4 end =================================");

    System.out.println("=========== test5 start =================================");
    test5(jsc);
    System.out.println("=========== test5 end =================================");

    System.out.println("=========== test6 start =================================");
    test6(jsc);
    System.out.println("=========== test6 end =================================");

    System.out.println("=========== test7 start =================================");
    test7(jsc);
    System.out.println("=========== test7 end =================================");

    System.out.println("=========== test8 start =================================");
    test8(jsc);
    System.out.println("=========== test8 end ================================="); 

    System.out.println("=========== test9 start =================================");
    test9(jsc);
    System.out.println("=========== test9 end =================================");

    System.out.println("=========== test10 start =================================");
    test10(jsc);
    System.out.println("=========== test10 end =================================");

    System.out.println("=========== test11 start =================================");
    test11(jsc);
    System.out.println("=========== test11 end =================================");

    System.out.println("=========== test12 start =================================");
    test12(jsc);
    System.out.println("=========== test12 end =================================");

    System.out.println("=========== test13 start =================================");
    test13(jsc);
    System.out.println("=========== test13 end =================================");

    System.out.println("=========== test14 start =================================");
    test14(jsc);
    System.out.println("=========== test14 end =================================");

    System.out.println("=========== test15 start =================================");
    test15();
    System.out.println("=========== test15 end =================================");

    System.out.println("=========== test16 start =================================");
    test16(jsc);
    System.out.println("=========== test16 end =================================");

    System.out.println("=========== test17 start =================================");
    test17(jsc);
    System.out.println("=========== test17 end =================================");

    System.out.println("=========== test18 start =================================");
    test18(jsc);
    System.out.println("=========== test18 end =================================");

    System.out.println("=========== test19 start =================================");
    test19(jsc);
    System.out.println("=========== test19 end =================================");
    
    jsc.stop();
    jsc.close();
    
    System.out.println("end................");
  }
  
  // List를 RDD로 변환하고 map연상을 통해서 x*x한 값으로 구성된 JavaRDD를 만들어서 화면에 출력 
  static void test (JavaSparkContext sc) {
	  JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));
	  JavaRDD<Integer> result = rdd.map((x) -> {return x*x;});
	  
	  System.out.println("result ==>"+StringUtils.join(result.collect(), ","));
  }
  
  // 문자열을 space로 분리(람다식을 이용함)하여 화면에 출력
  static void test2 (JavaSparkContext sc) {
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "hi"));
	  JavaRDD<String> words = lines.flatMap( (line) -> {return Arrays.asList(line.split(" ")).iterator();});
	  
	  System.out.println("result ==>"+StringUtils.join(words.collect(), ","));
  }
  
  
  // RDD의 함수인 disticnt, union, intersection, subtract, cartesian, countByValue를 적용하여 화면에 결과 출력 
  static void test3 (JavaSparkContext sc) {
	  JavaRDD<String> data1 = sc.parallelize(Arrays.asList("coffee", "coffee", "panda", "monkey", "tea"));
	  JavaRDD<String> data2 = sc.parallelize(Arrays.asList("coffee", "monkey", "kitty"));
	  JavaRDD<Integer> data3 = sc.parallelize(Arrays.asList(1,2,3));
	  
	  System.out.println("distinct ==>"+data1.distinct().collect());
	  System.out.println("union ==>"+data1.union(data2).collect());
	  System.out.println("intersection ==>"+data1.intersection(data2).collect());
	  System.out.println("subtract ==>"+data1.subtract(data2).collect());
	  System.out.println("cartesion ==>"+data1.cartesian(data3).collect());
	  System.out.println("countByValue ==>"+data1.countByValue());
  }
  
  
  // persist를 사용하여 reduce, fold등를 적용해보고, double형의 RDD로 변환하여 mean값을 구하여 출력함
  static void test4(JavaSparkContext sc) {
	  JavaRDD<Integer> data1 = sc.parallelize(Arrays.asList(1,2,3,4));
	  JavaRDD<Integer> data2 = sc.parallelize(Arrays.asList(3,4,5));
	  
	  List<Integer> data3 = new ArrayList<Integer>();
	  data3.add(1);
	  data3.add(2);
	  data3.add(3);
	  data3.add(4);
	  
	  JavaRDD<Integer> map = data1.map(x -> x+1);
	  map.persist(StorageLevel.MEMORY_AND_DISK());
	  
	  Function2<Integer, Integer, Integer> reduce = new Function2<Integer, Integer, Integer>() {
		 public Integer call(Integer x, Integer y) {
			 return x+y;
		 }
	  };
	  
	  DoubleFunction<Integer> df = new DoubleFunction<Integer>() {
		  public double call(Integer x) {
			  return (double) x;
		  }
	  };
	  
	  System.out.println("map==>"+data1.map(x -> x+1).reduce((x, y)->{return x+y;}));
	  
	  System.out.println("fold==>"+map.fold(0, reduce));
	  
	  map.foreach((x)->System.out.println(x));
	  
	  JavaDoubleRDD result = data1.mapToDouble((x) -> x);
	  System.out.println("mean ===>"+result.mean());
	  result.foreach((x) -> System.out.println(x));
	  
	  System.out.println("--------------------------");
	  
	  JavaDoubleRDD result2 = map.mapToDouble(df);
  	  System.out.println("mean by DoubleFuntion()===>"+result2.mean());
	  result2.foreach((x) -> System.out.println(x)); 

  }
  
  
  // 숫자형의 JavaRDD를 이용하여 aggregate함수를 사용하는 예제
  static void test5(JavaSparkContext sc) {
	  JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));
	  
	  Function2<AvgCount, Integer, AvgCount> addAndCount =
			  new Function2<AvgCount, Integer, AvgCount> (){
				private static final long serialVersionUID = 122222L;

				public AvgCount call(AvgCount a, Integer x) {
			  		a.total += x;
			  		a.num += 1;
			  		return a;
			  	}
			  };

	  Function2<AvgCount, AvgCount, AvgCount> combine =
	  new Function2<AvgCount, AvgCount, AvgCount> (){
		private static final long serialVersionUID = 11111L;

		public AvgCount call(AvgCount a, AvgCount b) {
	  		a.total += b.total;
	  		a.num += b.num;
	  		return a;
	  	}
	  };

	  AvgCount initial = new AvgCount(0, 0);
	  AvgCount result = rdd.aggregate(initial, addAndCount, combine);
      System.out.println(result.avg());
  }
  
  
  // 문자열 RDD를 key/value형태로 바꾸고 key를 기준을 작동하는 sortByKey, reduceByKey, groupByKey, sortByKey를 적용해보는 예제
  static void test6(JavaSparkContext sc) {
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "hi", "hi park", "dk"));
	  
	PairFunction<String, String, Integer> keyData = 
		new PairFunction<String, String, Integer>() {
		public Tuple2<String, Integer> call(String x) {
			return new Tuple2(x.split(" ")[0], x.length());
		}
	};
	
	JavaPairRDD<String, Integer> pairs = lines.mapToPair(keyData);
	pairs.foreach(x->System.out.println(x));
	
	JavaPairRDD<String, Integer> reduceByKey = pairs.reduceByKey(   (x, y) -> { return (x+y);} );
	JavaPairRDD<String, Iterable<Integer>> groupByKey = pairs.groupByKey();

	JavaPairRDD<String, Integer> sortByKey = pairs.sortByKey(false);
	
	System.out.println("reduceByKey =>"+reduceByKey.collect() );
	
	System.out.println("groupByKey =>"+groupByKey.collect() );
	
	System.out.println("sortBykey => "+sortByKey.collect() );
  }

  //   Tuple2형의 RDD를 이용하여 mapToPair를 적용하여 JavaPairRDD를 만들면서 각각의 RDD를 출력해보는 예제
  //  두개의 JavaPairRDD를 이용하여 subtractByKey, subtract, rightOuterJoin, leftOuterJoin, cogroup를 적용해보는 예제 
  static void test7(JavaSparkContext sc) {
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  List<Tuple2<String, Integer>> data2 = new ArrayList<Tuple2<String, Integer>>();
	  
	  data1.add(new Tuple2("a",2));
	  data1.add(new Tuple2("c",4));
	  data1.add(new Tuple2("c",6));
	  
	  data2.add(new Tuple2("c",4));
	  
	  JavaRDD<Tuple2<String, Integer>> pdataa1 = sc.parallelize(data1);
	  
	  JavaPairRDD<String, Integer> pdataa11 = pdataa1.mapToPair(  (x) -> {return new Tuple2(x._1, x._2);}  );
	  
	  System.out.println("pdataa1 ==>"+pdataa1);
	  System.out.println("pdataa11 ==>"+pdataa11);
	  
	  System.out.println("pdataa1 ==>"+pdataa1.collect());
	  System.out.println("pdataa11 ==>"+pdataa11.collect());
	  
	  JavaPairRDD<String, Integer> pdata1 = sc.parallelizePairs(data1);
	  JavaPairRDD<String, Integer> pdata2 = sc.parallelizePairs(data2);
	  
	  System.out.println("pdata1 ==>"+pdata1.collect());
	  System.out.println("pdata2 ==>"+pdata2.collect());

	System.out.println("subtractByKey =>"+pdataa11.subtractByKey(pdata2).collect());
	System.out.println("subtract =>"+pdata1.subtract(pdata2).collect());
	
	System.out.println("join =>"+pdata1.join(pdata2).collect());
	System.out.println("rightOuterJoin =>"+pdata1.rightOuterJoin(pdata2).collect());
	System.out.println("leftOuterJoin =>"+pdata1.leftOuterJoin(pdata2).collect());
	
	System.out.println("cogroup =>"+pdata1.cogroup(pdata2).collect());

	Function<Integer, Integer> ff = new Function<Integer, Integer>() {
		private static final long serialVersionUID = 11234L;
		int sum = 100;
		public Integer call (Integer x) {
			sum += x;
			return sum;
		}
	};
	
	System.out.println("mapValues =>" + pdata1.mapValues(ff).collect()); 
	
	//System.out.println("mapValues 1=>" + pdata1.reduce( (x2._2) -> {return x2._2;})           );
	System.out.println("reduce =>" + pdata1.reduce( (x2, y2) -> {return new Tuple2("sum of all elements", (x2._2+ y2._2) );})           );
  }
  
  // String형의 List를 JavaRDD를 만들고 이를 JavaPairRDD로 변환후 reduceByKey를 적용하는 예제
  static void test8(JavaSparkContext sc) {
	  List<String> data1 = new ArrayList<String>();
	  data1.add("ab");
	  data1.add("abcd");
	  data1.add("ab");
	  data1.add("cd");
	  
	  JavaRDD<String> pdata = sc.parallelize(data1);
	  
	  JavaPairRDD<String, Integer> pdata1 = pdata.mapToPair(x-> {return new Tuple2(x, 1);});
	  System.out.println("mapToPair==>"+pdata1.collect());
	  
	  JavaPairRDD<String, Integer> pdata2 = pdata1.reduceByKey( (x, y)-> { return (x+y); } );
	  System.out.println("reduceByKey==>"+pdata2.collect() );

  }
  
  //   sc.parallelizePairs를 이용하여  JavaPairRDD를 만들고 combineByKey 적용하고 Map으로 collect후에 출력하는 예제
  static void test9(JavaSparkContext sc) {
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  data1.add(new Tuple2("a", 1));
	  data1.add(new Tuple2("b", 1));
	  data1.add(new Tuple2("a", 10));
	  data1.add(new Tuple2("c", 9));
	  
	  JavaPairRDD<String, Integer> pdata = sc.parallelizePairs(data1);
	  
	  Function<Integer, AvgCount2> createCombiner = new Function<Integer, AvgCount2>() {
			public AvgCount2 call(Integer x) {
				return new AvgCount2(x, 1);
			}
		};

		Function2<AvgCount2, Integer, AvgCount2> mergeValue =
			new Function2<AvgCount2, Integer, AvgCount2>() {
			public AvgCount2 call(AvgCount2 a, Integer x) {
				a.total_ += x;
				a.num_ += 1;
				return a;
			}
		};

		Function2<AvgCount2, AvgCount2, AvgCount2> mergeCombiner =
			new Function2<AvgCount2, AvgCount2, AvgCount2>() {
			public AvgCount2 call(AvgCount2 a, AvgCount2 b) {
				a.total_ += b.total_;
				a.num_ += b.num_;
				return a;
			}
		};
	  
	  JavaPairRDD<String, AvgCount2> avgCounts =  pdata.combineByKey(createCombiner, mergeValue, mergeCombiner);
	  Map<String, AvgCount2> countMap = avgCounts.collectAsMap();

	  for(Entry<String, AvgCount2> entry : countMap.entrySet()) {
	  	System.out.println(entry.getKey() + ":" + entry.getValue().avg());
	  }
  
  }
  
  // 두개의 Tuple2형 List를 이용하여 sc.paralledizePairs를 이용하여 JavaPairRDD로 변환하고 leftOuterJoin를 적용하는 예제
  static void test10(JavaSparkContext sc) {
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  data1.add(new Tuple2("x", 10));	  
	  data1.add(new Tuple2("a", 1));
	  data1.add(new Tuple2("b", 1));

	  
	  
	  List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
	  data2.add(new Tuple2("a", "aa"));
	  data2.add(new Tuple2("b", "bb"));
	  
	  JavaPairRDD<String, Integer> pdata1 = sc.parallelizePairs(data1);
	  JavaPairRDD<String, String> pdata2 = sc.parallelizePairs(data2);
	  
	  pdata1.sortByKey(true);
	  
	 JavaPairRDD<String, Tuple2<Integer,Optional<String>>> result = pdata1.leftOuterJoin(pdata2);

	 System.out.println("pdata1==>"+pdata1.collect());
	 System.out.println("pdata2==>"+pdata2.collect());
	 
	 System.out.println("result==>"+result.collect());
  
  }
  
  // test.json파일을 일거서 메모리에 적재하고 데이타를 파싱하여 Person1 객체에 담아 partition별로 map작업을 수행고 HDFS에 저장하는 예제  
  static void test11(JavaSparkContext sc) {
	  String dir = "formatted-out";
	  
   	 JavaRDD<String> input = sc.textFile("file:///tmp/test.json", 5).persist(StorageLevel.MEMORY_ONLY_2());
   	 
   	 JavaRDD<Person1> result = input.mapPartitions(new ParseJson());
   	 
   	 System.out.println("persons from json ===>"+result.collect());
   	 
   	 JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
   	 
   	 delete_dir(dir);
   	 //mkdir_dir();
   	
   	 formatted.saveAsTextFile(dir);
  
  }
  
  // Tuple2형의 List를 key/value형의 JavaPairRDD로 변환하고 ConvertToWritableTypes 객체를 이용하여 IntWritable형으로 변환하여
  // SequenceFileOutputFormat으로 저장하는 예제
  static void test12(JavaSparkContext sc) {
	  String dir = "sequence-write";
	  
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  
	  data1.add(new Tuple2("a",2));
	  data1.add(new Tuple2("c",4));
	  data1.add(new Tuple2("c",6));
	  
	  JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(data1);
	  
	  JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes());
	  
	  delete_dir(dir);
	  
	  System.out.println("Native Values before ==>"+data1.toString());
	   	 
	  result.saveAsNewAPIHadoopFile(dir, Text.class, IntWritable.class,	SequenceFileOutputFormat.class);
	  System.out.println("Saved as SequenceFileOutputFormat.class");
  }
  
  // Writable형의 저장되어 있는 sequence format의 data를 읽어서 원래의 값으로 변환하는 예제
  static void test13(JavaSparkContext sc) {
	  String fileName = "sequence-write";
	  JavaPairRDD<Text, IntWritable> input = sc.sequenceFile(fileName, Text.class, IntWritable.class);
	  JavaPairRDD<String, Integer> result = input.mapToPair(new ConvertToNativeTypes());
	  
	  System.out.println("Native Values after ====>"+result.collect());
  }
  
  
  // HIveContext를 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0기준으로 deprecated됨)
  static void test14(JavaSparkContext sc) {
	  HiveContext ctx = new HiveContext(sc);
	  
	  Dataset<Row>  rows = ctx.sql("select * from default.test_table");
  
	  Dataset<String> stringsDS = rows.map(new MapFunction<Row, String>() {
		  @Override
		  public String call(Row row) throws Exception {
			  //return "Key : "+row.get(0) + ", Value : "+row.get(1);
			  return "Value : "+row.get(0);
		  }
	  }, Encoders.STRING());
	  
	  System.out.println("select result #1 =>"+stringsDS.toJavaRDD().collect());
  }
  
  // SparkSession을 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0이상에서는 이것만 유효함)
  static void test15() {
	  SparkSession session = new SparkSession.Builder().appName("SparkJoinExample").master("local").enableHiveSupport().getOrCreate();
      Dataset<Row> dset = session.sql("select * from default.test_table");
      
	  System.out.println("select result #2 =>"+dset.toJavaRDD().collect());
  }

  // SQLContext를 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0기준으로 deprecated됨)
  static void test16(JavaSparkContext sc) {
	  SQLContext ctx = new SQLContext(sc);
	  Dataset<Row>  rows = ctx.sql("select * from default.test_table");
  
	  Dataset<String> stringsDS = rows.map(new MapFunction<Row, String>() {
		  @Override
		  public String call(Row row) throws Exception {
			  //return "Key : "+row.get(0) + ", Value : "+row.get(1);
			  return "Value : "+row.get(0);
		  }
	  }, Encoders.STRING());
	  
	  System.out.println("select result #3 =>"+stringsDS.toJavaRDD().collect());
  }
  
  // accumulator 변수를 사용하는 예제
  static void test17(JavaSparkContext sc) {
	  final Accumulator<Integer> blankLines = sc.accumulator(0);
	  
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "", "hi park", "", "sss ff"));
	  
	  JavaRDD<String> callSigns = lines.flatMap(
			  new FlatMapFunction<String, String>() {
				  public Iterator<String> call(String line) {
					  if(line.equals("")) {
						  blankLines.add(1);
					  }
					 return Arrays.asList(line.split(" ")).iterator();
				  }
				});
				
	  			delete_dir("output.txt");
	  
				callSigns.saveAsTextFile("output.txt");
				System.out.println("Blank lines: "+blankLines.value());
  }
  
  // accumulator와 broadcast변수를 동시에 사용하는 예제
  static void test18(JavaSparkContext sc) {

      final Accumulator<Integer> blankLines = sc.accumulator(0);
	  
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "", "hi park", "", "sss ff"));
	  
	  JavaRDD<String> callSigns = lines.flatMap(
			  new FlatMapFunction<String, String>() {
				  public Iterator<String> call(String line) {
					  if(line.equals("")) {
						  blankLines.add(1);
					  }
					  System.out.println("str in broadcasted  ==>"+temp.value());
					 return Arrays.asList(line.split(" ")).iterator();
				  }
				});
	  
	  			delete_dir("output.txt2");
				
				callSigns.saveAsTextFile("output.txt2");
				System.out.println("Blank lines: "+blankLines.value());
  }
  
  // 문자형 List를 JavaRDD로 변환하고 mapToDobule를 이용하여 double형으로 변환하고 집계함수인 sum, mean, variance, stdev등을 적용해보는 예제
  static void test19(JavaSparkContext sc) {
	  JavaRDD<String> age = sc.parallelize(Arrays.asList("1","2","3","4","5"));
	  JavaDoubleRDD doubleAge = age.mapToDouble(new DoubleFunction<String>() {
		  public double call(String value) {
			  return Double.parseDouble(value);
		  }});
	  
	  System.out.println("sum = "+doubleAge.sum());
	  System.out.println("mean ="+doubleAge.mean());
	  System.out.println("variance ="+doubleAge.variance());
	  System.out.println("stdev ="+doubleAge.stdev());
  }
 
  // Tuple2<Text, IntWritable>값을 받아서 String형 key와 Integer형 value로 변환하는 클래스
  static class ConvertToNativeTypes implements PairFunction<Tuple2<Text, IntWritable>, String, Integer> {
	  public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) {
		  return new Tuple2(record._1.toString(), record._2.get());
	  }
  }
  
  // String형의 key와 Integer형의 Tuple2를 받아서 Text형의 key와 IntWritable형의 Tuple2값으로 변환하는 클래스  
  static class ConvertToWritableTypes implements PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
	  public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
		  return new Tuple2(new Text(record._1), new IntWritable(record._2));
	  }
  }
  
  // HDFS상의 폴더밑 하위 파일을 지우는 함수
  static void delete_dir(String ff) {
	  Configuration conf = new Configuration();
	  conf.set("fs.default.name",  "hdfs://mycluster");
	  try {
		  FileSystem dfs = FileSystem.get(conf);
	  
		  System.out.println("Home Path : " + dfs.getHomeDirectory());
		  System.out.println("Work Path : " + dfs.getWorkingDirectory());
		  
		  Path dir = new Path (ff);
		  if(dfs.exists(dir)) {
			  dfs.delete(dir, true);
		  }
	  } catch (Exception e) {
		  System.out.println("delete dir error ==>"+e);
	  }
  }
  
  // HDFS상의 폴더를 생성하는 함수
  static void mkdir_dir() {
	  Configuration conf = new Configuration();
	  conf.set("fs.default.name", "hdfs://mycluster");
	  try {
		  FileSystem dfs = FileSystem.get(conf);
		  
		  Path dir = new Path ("formatted-out");
		  if( ! dfs.exists(dir)) {
			  dfs.mkdirs(dir);
		  }
	  } catch (Exception e) {
		  System.out.println("mk dir error ==>"+e);
	  }
  }
  
  
  
}


// json데이타를 Person1형의 iterator로 변환하는 클래스
class ParseJson implements FlatMapFunction<Iterator<String>, Person1> {
	public Iterator<Person1> call(Iterator<String> lines) throws Exception {
		ArrayList<Person1> people = new ArrayList<Person1>();
		
		Gson mapper = new Gson();
		while (lines.hasNext()) {
			String line = lines.next();
			try {
				System.out.println("line => "+line);
				Person1 person1 = mapper.fromJson(line, Person1.class);
				System.out.println("person1=>"+person1);
				people.add(person1);
			} catch (Exception e) {
				// 무시함
			}
		}
		return people.iterator();
	}
}

// Person1형의 데이타를 String형의 iterator로 변환하는 클래스
class WriteJson implements FlatMapFunction<Iterator<Person1>, String> {
	public Iterator<String> call(Iterator<Person1> people) throws Exception {
		ArrayList<String> text = new ArrayList<String>();
		while(people.hasNext()) {
			Person1 person = people.next();
			text.add("new string =>"+person.toString());
		}
		return text.iterator();
	}
}

class Person1 implements Serializable {
	String name;
	int age;
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public int getAge() {
		return age;
	}
	public void setAge(int age) {
		this.age = age;
	}
	@Override
	public String toString() {
		return "Person1 [name=" + name + ", age=" + age + "]";
	}
	
}

// 평균값을 구하는 클래스 1
class AvgCount implements Serializable {
	private static final long serialVersionUID = 134444L;
	public AvgCount (int total, int num) {
		this.total = total;
		this.num = num;
	}
	
	public int total;
	public int num;
	public double avg() {
		return total / (double) num;
	}
}


//평균값을 구하는 클래스 2
class AvgCount2 implements Serializable {
	private static final long serialVersionUID = -1683922668212126392L;
	public AvgCount2(int total, int num) { total_= total; num_=num;}
	
	public int total_;
	public int num_;
	public float avg() {
		return total_ / (float) num_;
	}
}


번호 제목 글쓴이 날짜 조회 수
» spark 2.0.0의 api를 이용하는 예제 프로그램 총관리자 2017.03.15 199
20 Scala에서 countByWindow를 이용하기(예제) 총관리자 2018.03.08 235
19 Windows7 64bit 환경에서 Apache Spark 2.2.0 설치하기 총관리자 2017.07.26 260
18 Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging 발생시 조치사항 총관리자 2017.04.19 284
17 Spark에서 KafkaUtils.createStream()를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 예제 소스 총관리자 2017.04.26 292
16 spark-submit으로 spark application실행하는 다양한 방법 총관리자 2016.05.25 303
15 Apache Spark와 Drools를 이용한 CEP구현 테스트 총관리자 2016.07.15 342
14 spark-sql실행시 The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH오류 발생시 조치사항 총관리자 2016.06.09 451
13 Spark 1.6.1 설치후 HA구성 총관리자 2016.05.24 455
12 java.lang.OutOfMemoryError: unable to create new native thread오류 발생지 조치사항 총관리자 2016.10.17 467
11 spark-submit 실행시 "java.lang.OutOfMemoryError: Java heap space"발생시 조치사항 총관리자 2018.02.01 517
10 spark client프로그램 기동시 "Error initializing SparkContext"오류 발생할때 조치사항 총관리자 2016.05.27 539
9 spark-shell실행시 "A read-only user or a user in a read-only database is not permitted to disable read-only mode on a connection."오류가 발생하는 경우 해결방법 총관리자 2016.05.20 551
8 spark-env.sh에서 사용할 수있는 항목. 총관리자 2016.05.24 567
7 kafka로 부터 메세지를 stream으로 받아 처리하는 spark샘플소스(spark의 producer와 consumer를 sbt로 컴파일 하고 서버에서 spark-submit하는 방법) 총관리자 2016.07.13 630
6 "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources"오류 발생시 조치사항 총관리자 2016.05.25 1028
5 spark stream처리할때 두개의 client프로그램이 동일한 checkpoint로 접근할때 발생하는 오류 내용 총관리자 2018.01.16 1115
4 Spark 2.1.1 clustering(5대) 설치(YARN기반) 총관리자 2016.04.22 1882
3 VisualVM 1.3.9을 이용한 spark-submit JVM 모니터링을 위한 설정및 spark-submit실행 옵션 총관리자 2016.10.28 1891
2 spark-sql실행시 Caused by: java.lang.NumberFormatException: For input string: "0s" 오류발생시 조치사항 총관리자 2016.06.09 2802

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로