메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


spark 테스트 프로그램 몇개

package com.gooper.test;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;

import scala.Tuple2;

import com.google.gson.Gson;

public final class JavaSparkPi {
	
	static final SparkConf sparkConf=new SparkConf().setAppName("JavaSparkPi");
	static {
		sparkConf.setMaster("spark://gsda1:7077,gsda2:7077");
		sparkConf.setMaster("local[*]");

	}
    
	static final JavaSparkContext jsc=new JavaSparkContext(sparkConf);
	static  final Broadcast<List<String>> temp = jsc.broadcast(Arrays.asList("hello world", "", "hi park", "", "sss ff"));

  public static void main(String[] args) throws Exception {
	  
	System.out.println("start................");

	// PI값 구하기
    int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
    int n = 100000 * slices;
    List<Integer> l = new ArrayList<Integer>(n);
    for (int i = 0; i < n; i++) {
      l.add(i);
    }

    JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
    
    int count = dataSet.map((z) ->  {
        double x = Math.random() * 2 - 1;
        double y = Math.random() * 2 - 1;
        if(x * x + y * y <= 1)  return 1; else return 0;
    }).reduce((s, s2) -> (s + s2));

    System.out.println("Pi is roughly " + 4.0 * count / n);

    
    System.out.println("=========== test start =================================");
    test(jsc);
    System.out.println("=========== test end =================================");
    
    System.out.println("=========== test2 start =================================");
    test2(jsc);
    System.out.println("=========== test2 end =================================");
    
    System.out.println("=========== test3 start =================================");
    test3(jsc);
    System.out.println("=========== test3 end =================================");

    System.out.println("=========== test4 start =================================");
    test4(jsc);
    System.out.println("=========== test4 end =================================");

    System.out.println("=========== test5 start =================================");
    test5(jsc);
    System.out.println("=========== test5 end =================================");

    System.out.println("=========== test6 start =================================");
    test6(jsc);
    System.out.println("=========== test6 end =================================");

    System.out.println("=========== test7 start =================================");
    test7(jsc);
    System.out.println("=========== test7 end =================================");

    System.out.println("=========== test8 start =================================");
    test8(jsc);
    System.out.println("=========== test8 end ================================="); 

    System.out.println("=========== test9 start =================================");
    test9(jsc);
    System.out.println("=========== test9 end =================================");

    System.out.println("=========== test10 start =================================");
    test10(jsc);
    System.out.println("=========== test10 end =================================");

    System.out.println("=========== test11 start =================================");
    test11(jsc);
    System.out.println("=========== test11 end =================================");

    System.out.println("=========== test12 start =================================");
    test12(jsc);
    System.out.println("=========== test12 end =================================");

    System.out.println("=========== test13 start =================================");
    test13(jsc);
    System.out.println("=========== test13 end =================================");

    System.out.println("=========== test14 start =================================");
    test14(jsc);
    System.out.println("=========== test14 end =================================");

    System.out.println("=========== test15 start =================================");
    test15();
    System.out.println("=========== test15 end =================================");

    System.out.println("=========== test16 start =================================");
    test16(jsc);
    System.out.println("=========== test16 end =================================");

    System.out.println("=========== test17 start =================================");
    test17(jsc);
    System.out.println("=========== test17 end =================================");

    System.out.println("=========== test18 start =================================");
    test18(jsc);
    System.out.println("=========== test18 end =================================");

    System.out.println("=========== test19 start =================================");
    test19(jsc);
    System.out.println("=========== test19 end =================================");
    
    jsc.stop();
    jsc.close();
    
    System.out.println("end................");
  }
  
  // List를 RDD로 변환하고 map연상을 통해서 x*x한 값으로 구성된 JavaRDD를 만들어서 화면에 출력 
  static void test (JavaSparkContext sc) {
	  JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));
	  JavaRDD<Integer> result = rdd.map((x) -> {return x*x;});
	  
	  System.out.println("result ==>"+StringUtils.join(result.collect(), ","));
  }
  
  // 문자열을 space로 분리(람다식을 이용함)하여 화면에 출력
  static void test2 (JavaSparkContext sc) {
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "hi"));
	  JavaRDD<String> words = lines.flatMap( (line) -> {return Arrays.asList(line.split(" ")).iterator();});
	  
	  System.out.println("result ==>"+StringUtils.join(words.collect(), ","));
  }
  
  
  // RDD의 함수인 disticnt, union, intersection, subtract, cartesian, countByValue를 적용하여 화면에 결과 출력 
  static void test3 (JavaSparkContext sc) {
	  JavaRDD<String> data1 = sc.parallelize(Arrays.asList("coffee", "coffee", "panda", "monkey", "tea"));
	  JavaRDD<String> data2 = sc.parallelize(Arrays.asList("coffee", "monkey", "kitty"));
	  JavaRDD<Integer> data3 = sc.parallelize(Arrays.asList(1,2,3));
	  
	  System.out.println("distinct ==>"+data1.distinct().collect());
	  System.out.println("union ==>"+data1.union(data2).collect());
	  System.out.println("intersection ==>"+data1.intersection(data2).collect());
	  System.out.println("subtract ==>"+data1.subtract(data2).collect());
	  System.out.println("cartesion ==>"+data1.cartesian(data3).collect());
	  System.out.println("countByValue ==>"+data1.countByValue());
  }
  
  
  // persist를 사용하여 reduce, fold등를 적용해보고, double형의 RDD로 변환하여 mean값을 구하여 출력함
  static void test4(JavaSparkContext sc) {
	  JavaRDD<Integer> data1 = sc.parallelize(Arrays.asList(1,2,3,4));
	  JavaRDD<Integer> data2 = sc.parallelize(Arrays.asList(3,4,5));
	  
	  List<Integer> data3 = new ArrayList<Integer>();
	  data3.add(1);
	  data3.add(2);
	  data3.add(3);
	  data3.add(4);
	  
	  JavaRDD<Integer> map = data1.map(x -> x+1);
	  map.persist(StorageLevel.MEMORY_AND_DISK());
	  
	  Function2<Integer, Integer, Integer> reduce = new Function2<Integer, Integer, Integer>() {
		 public Integer call(Integer x, Integer y) {
			 return x+y;
		 }
	  };
	  
	  DoubleFunction<Integer> df = new DoubleFunction<Integer>() {
		  public double call(Integer x) {
			  return (double) x;
		  }
	  };
	  
	  System.out.println("map==>"+data1.map(x -> x+1).reduce((x, y)->{return x+y;}));
	  
	  System.out.println("fold==>"+map.fold(0, reduce));
	  
	  map.foreach((x)->System.out.println(x));
	  
	  JavaDoubleRDD result = data1.mapToDouble((x) -> x);
	  System.out.println("mean ===>"+result.mean());
	  result.foreach((x) -> System.out.println(x));
	  
	  System.out.println("--------------------------");
	  
	  JavaDoubleRDD result2 = map.mapToDouble(df);
  	  System.out.println("mean by DoubleFuntion()===>"+result2.mean());
	  result2.foreach((x) -> System.out.println(x)); 

  }
  
  
  // 숫자형의 JavaRDD를 이용하여 aggregate함수를 사용하는 예제
  static void test5(JavaSparkContext sc) {
	  JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));
	  
	  Function2<AvgCount, Integer, AvgCount> addAndCount =
			  new Function2<AvgCount, Integer, AvgCount> (){
				private static final long serialVersionUID = 122222L;

				public AvgCount call(AvgCount a, Integer x) {
			  		a.total += x;
			  		a.num += 1;
			  		return a;
			  	}
			  };

	  Function2<AvgCount, AvgCount, AvgCount> combine =
	  new Function2<AvgCount, AvgCount, AvgCount> (){
		private static final long serialVersionUID = 11111L;

		public AvgCount call(AvgCount a, AvgCount b) {
	  		a.total += b.total;
	  		a.num += b.num;
	  		return a;
	  	}
	  };

	  AvgCount initial = new AvgCount(0, 0);
	  AvgCount result = rdd.aggregate(initial, addAndCount, combine);
      System.out.println(result.avg());
  }
  
  
  // 문자열 RDD를 key/value형태로 바꾸고 key를 기준을 작동하는 sortByKey, reduceByKey, groupByKey, sortByKey를 적용해보는 예제
  static void test6(JavaSparkContext sc) {
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "hi", "hi park", "dk"));
	  
	PairFunction<String, String, Integer> keyData = 
		new PairFunction<String, String, Integer>() {
		public Tuple2<String, Integer> call(String x) {
			return new Tuple2(x.split(" ")[0], x.length());
		}
	};
	
	JavaPairRDD<String, Integer> pairs = lines.mapToPair(keyData);
	pairs.foreach(x->System.out.println(x));
	
	JavaPairRDD<String, Integer> reduceByKey = pairs.reduceByKey(   (x, y) -> { return (x+y);} );
	JavaPairRDD<String, Iterable<Integer>> groupByKey = pairs.groupByKey();

	JavaPairRDD<String, Integer> sortByKey = pairs.sortByKey(false);
	
	System.out.println("reduceByKey =>"+reduceByKey.collect() );
	
	System.out.println("groupByKey =>"+groupByKey.collect() );
	
	System.out.println("sortBykey => "+sortByKey.collect() );
  }

  //   Tuple2형의 RDD를 이용하여 mapToPair를 적용하여 JavaPairRDD를 만들면서 각각의 RDD를 출력해보는 예제
  //  두개의 JavaPairRDD를 이용하여 subtractByKey, subtract, rightOuterJoin, leftOuterJoin, cogroup를 적용해보는 예제 
  static void test7(JavaSparkContext sc) {
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  List<Tuple2<String, Integer>> data2 = new ArrayList<Tuple2<String, Integer>>();
	  
	  data1.add(new Tuple2("a",2));
	  data1.add(new Tuple2("c",4));
	  data1.add(new Tuple2("c",6));
	  
	  data2.add(new Tuple2("c",4));
	  
	  JavaRDD<Tuple2<String, Integer>> pdataa1 = sc.parallelize(data1);
	  
	  JavaPairRDD<String, Integer> pdataa11 = pdataa1.mapToPair(  (x) -> {return new Tuple2(x._1, x._2);}  );
	  
	  System.out.println("pdataa1 ==>"+pdataa1);
	  System.out.println("pdataa11 ==>"+pdataa11);
	  
	  System.out.println("pdataa1 ==>"+pdataa1.collect());
	  System.out.println("pdataa11 ==>"+pdataa11.collect());
	  
	  JavaPairRDD<String, Integer> pdata1 = sc.parallelizePairs(data1);
	  JavaPairRDD<String, Integer> pdata2 = sc.parallelizePairs(data2);
	  
	  System.out.println("pdata1 ==>"+pdata1.collect());
	  System.out.println("pdata2 ==>"+pdata2.collect());

	System.out.println("subtractByKey =>"+pdataa11.subtractByKey(pdata2).collect());
	System.out.println("subtract =>"+pdata1.subtract(pdata2).collect());
	
	System.out.println("join =>"+pdata1.join(pdata2).collect());
	System.out.println("rightOuterJoin =>"+pdata1.rightOuterJoin(pdata2).collect());
	System.out.println("leftOuterJoin =>"+pdata1.leftOuterJoin(pdata2).collect());
	
	System.out.println("cogroup =>"+pdata1.cogroup(pdata2).collect());

	Function<Integer, Integer> ff = new Function<Integer, Integer>() {
		private static final long serialVersionUID = 11234L;
		int sum = 100;
		public Integer call (Integer x) {
			sum += x;
			return sum;
		}
	};
	
	System.out.println("mapValues =>" + pdata1.mapValues(ff).collect()); 
	
	//System.out.println("mapValues 1=>" + pdata1.reduce( (x2._2) -> {return x2._2;})           );
	System.out.println("reduce =>" + pdata1.reduce( (x2, y2) -> {return new Tuple2("sum of all elements", (x2._2+ y2._2) );})           );
  }
  
  // String형의 List를 JavaRDD를 만들고 이를 JavaPairRDD로 변환후 reduceByKey를 적용하는 예제
  static void test8(JavaSparkContext sc) {
	  List<String> data1 = new ArrayList<String>();
	  data1.add("ab");
	  data1.add("abcd");
	  data1.add("ab");
	  data1.add("cd");
	  
	  JavaRDD<String> pdata = sc.parallelize(data1);
	  
	  JavaPairRDD<String, Integer> pdata1 = pdata.mapToPair(x-> {return new Tuple2(x, 1);});
	  System.out.println("mapToPair==>"+pdata1.collect());
	  
	  JavaPairRDD<String, Integer> pdata2 = pdata1.reduceByKey( (x, y)-> { return (x+y); } );
	  System.out.println("reduceByKey==>"+pdata2.collect() );

  }
  
  //   sc.parallelizePairs를 이용하여  JavaPairRDD를 만들고 combineByKey 적용하고 Map으로 collect후에 출력하는 예제
  static void test9(JavaSparkContext sc) {
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  data1.add(new Tuple2("a", 1));
	  data1.add(new Tuple2("b", 1));
	  data1.add(new Tuple2("a", 10));
	  data1.add(new Tuple2("c", 9));
	  
	  JavaPairRDD<String, Integer> pdata = sc.parallelizePairs(data1);
	  
	  Function<Integer, AvgCount2> createCombiner = new Function<Integer, AvgCount2>() {
			public AvgCount2 call(Integer x) {
				return new AvgCount2(x, 1);
			}
		};

		Function2<AvgCount2, Integer, AvgCount2> mergeValue =
			new Function2<AvgCount2, Integer, AvgCount2>() {
			public AvgCount2 call(AvgCount2 a, Integer x) {
				a.total_ += x;
				a.num_ += 1;
				return a;
			}
		};

		Function2<AvgCount2, AvgCount2, AvgCount2> mergeCombiner =
			new Function2<AvgCount2, AvgCount2, AvgCount2>() {
			public AvgCount2 call(AvgCount2 a, AvgCount2 b) {
				a.total_ += b.total_;
				a.num_ += b.num_;
				return a;
			}
		};
	  
	  JavaPairRDD<String, AvgCount2> avgCounts =  pdata.combineByKey(createCombiner, mergeValue, mergeCombiner);
	  Map<String, AvgCount2> countMap = avgCounts.collectAsMap();

	  for(Entry<String, AvgCount2> entry : countMap.entrySet()) {
	  	System.out.println(entry.getKey() + ":" + entry.getValue().avg());
	  }
  
  }
  
  // 두개의 Tuple2형 List를 이용하여 sc.paralledizePairs를 이용하여 JavaPairRDD로 변환하고 leftOuterJoin를 적용하는 예제
  static void test10(JavaSparkContext sc) {
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  data1.add(new Tuple2("x", 10));	  
	  data1.add(new Tuple2("a", 1));
	  data1.add(new Tuple2("b", 1));

	  
	  
	  List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
	  data2.add(new Tuple2("a", "aa"));
	  data2.add(new Tuple2("b", "bb"));
	  
	  JavaPairRDD<String, Integer> pdata1 = sc.parallelizePairs(data1);
	  JavaPairRDD<String, String> pdata2 = sc.parallelizePairs(data2);
	  
	  pdata1.sortByKey(true);
	  
	 JavaPairRDD<String, Tuple2<Integer,Optional<String>>> result = pdata1.leftOuterJoin(pdata2);

	 System.out.println("pdata1==>"+pdata1.collect());
	 System.out.println("pdata2==>"+pdata2.collect());
	 
	 System.out.println("result==>"+result.collect());
  
  }
  
  // test.json파일을 일거서 메모리에 적재하고 데이타를 파싱하여 Person1 객체에 담아 partition별로 map작업을 수행고 HDFS에 저장하는 예제  
  static void test11(JavaSparkContext sc) {
	  String dir = "formatted-out";
	  
   	 JavaRDD<String> input = sc.textFile("file:///tmp/test.json", 5).persist(StorageLevel.MEMORY_ONLY_2());
   	 
   	 JavaRDD<Person1> result = input.mapPartitions(new ParseJson());
   	 
   	 System.out.println("persons from json ===>"+result.collect());
   	 
   	 JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
   	 
   	 delete_dir(dir);
   	 //mkdir_dir();
   	
   	 formatted.saveAsTextFile(dir);
  
  }
  
  // Tuple2형의 List를 key/value형의 JavaPairRDD로 변환하고 ConvertToWritableTypes 객체를 이용하여 IntWritable형으로 변환하여
  // SequenceFileOutputFormat으로 저장하는 예제
  static void test12(JavaSparkContext sc) {
	  String dir = "sequence-write";
	  
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  
	  data1.add(new Tuple2("a",2));
	  data1.add(new Tuple2("c",4));
	  data1.add(new Tuple2("c",6));
	  
	  JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(data1);
	  
	  JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes());
	  
	  delete_dir(dir);
	  
	  System.out.println("Native Values before ==>"+data1.toString());
	   	 
	  result.saveAsNewAPIHadoopFile(dir, Text.class, IntWritable.class,	SequenceFileOutputFormat.class);
	  System.out.println("Saved as SequenceFileOutputFormat.class");
  }
  
  // Writable형의 저장되어 있는 sequence format의 data를 읽어서 원래의 값으로 변환하는 예제
  static void test13(JavaSparkContext sc) {
	  String fileName = "sequence-write";
	  JavaPairRDD<Text, IntWritable> input = sc.sequenceFile(fileName, Text.class, IntWritable.class);
	  JavaPairRDD<String, Integer> result = input.mapToPair(new ConvertToNativeTypes());
	  
	  System.out.println("Native Values after ====>"+result.collect());
  }
  
  
  // HIveContext를 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0기준으로 deprecated됨)
  static void test14(JavaSparkContext sc) {
	  HiveContext ctx = new HiveContext(sc);
	  
	  Dataset<Row>  rows = ctx.sql("select * from default.test_table");
  
	  Dataset<String> stringsDS = rows.map(new MapFunction<Row, String>() {
		  @Override
		  public String call(Row row) throws Exception {
			  //return "Key : "+row.get(0) + ", Value : "+row.get(1);
			  return "Value : "+row.get(0);
		  }
	  }, Encoders.STRING());
	  
	  System.out.println("select result #1 =>"+stringsDS.toJavaRDD().collect());
  }
  
  // SparkSession을 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0이상에서는 이것만 유효함)
  static void test15() {
	  SparkSession session = new SparkSession.Builder().appName("SparkJoinExample").master("local").enableHiveSupport().getOrCreate();
      Dataset<Row> dset = session.sql("select * from default.test_table");
      
	  System.out.println("select result #2 =>"+dset.toJavaRDD().collect());
  }

  // SQLContext를 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0기준으로 deprecated됨)
  static void test16(JavaSparkContext sc) {
	  SQLContext ctx = new SQLContext(sc);
	  Dataset<Row>  rows = ctx.sql("select * from default.test_table");
  
	  Dataset<String> stringsDS = rows.map(new MapFunction<Row, String>() {
		  @Override
		  public String call(Row row) throws Exception {
			  //return "Key : "+row.get(0) + ", Value : "+row.get(1);
			  return "Value : "+row.get(0);
		  }
	  }, Encoders.STRING());
	  
	  System.out.println("select result #3 =>"+stringsDS.toJavaRDD().collect());
  }
  
  // accumulator 변수를 사용하는 예제
  static void test17(JavaSparkContext sc) {
	  final Accumulator<Integer> blankLines = sc.accumulator(0);
	  
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "", "hi park", "", "sss ff"));
	  
	  JavaRDD<String> callSigns = lines.flatMap(
			  new FlatMapFunction<String, String>() {
				  public Iterator<String> call(String line) {
					  if(line.equals("")) {
						  blankLines.add(1);
					  }
					 return Arrays.asList(line.split(" ")).iterator();
				  }
				});
				
	  			delete_dir("output.txt");
	  
				callSigns.saveAsTextFile("output.txt");
				System.out.println("Blank lines: "+blankLines.value());
  }
  
  // accumulator와 broadcast변수를 동시에 사용하는 예제
  static void test18(JavaSparkContext sc) {

      final Accumulator<Integer> blankLines = sc.accumulator(0);
	  
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "", "hi park", "", "sss ff"));
	  
	  JavaRDD<String> callSigns = lines.flatMap(
			  new FlatMapFunction<String, String>() {
				  public Iterator<String> call(String line) {
					  if(line.equals("")) {
						  blankLines.add(1);
					  }
					  System.out.println("str in broadcasted  ==>"+temp.value());
					 return Arrays.asList(line.split(" ")).iterator();
				  }
				});
	  
	  			delete_dir("output.txt2");
				
				callSigns.saveAsTextFile("output.txt2");
				System.out.println("Blank lines: "+blankLines.value());
  }
  
  // 문자형 List를 JavaRDD로 변환하고 mapToDobule를 이용하여 double형으로 변환하고 집계함수인 sum, mean, variance, stdev등을 적용해보는 예제
  static void test19(JavaSparkContext sc) {
	  JavaRDD<String> age = sc.parallelize(Arrays.asList("1","2","3","4","5"));
	  JavaDoubleRDD doubleAge = age.mapToDouble(new DoubleFunction<String>() {
		  public double call(String value) {
			  return Double.parseDouble(value);
		  }});
	  
	  System.out.println("sum = "+doubleAge.sum());
	  System.out.println("mean ="+doubleAge.mean());
	  System.out.println("variance ="+doubleAge.variance());
	  System.out.println("stdev ="+doubleAge.stdev());
  }
 
  // Tuple2<Text, IntWritable>값을 받아서 String형 key와 Integer형 value로 변환하는 클래스
  static class ConvertToNativeTypes implements PairFunction<Tuple2<Text, IntWritable>, String, Integer> {
	  public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) {
		  return new Tuple2(record._1.toString(), record._2.get());
	  }
  }
  
  // String형의 key와 Integer형의 Tuple2를 받아서 Text형의 key와 IntWritable형의 Tuple2값으로 변환하는 클래스  
  static class ConvertToWritableTypes implements PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
	  public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
		  return new Tuple2(new Text(record._1), new IntWritable(record._2));
	  }
  }
  
  // HDFS상의 폴더밑 하위 파일을 지우는 함수
  static void delete_dir(String ff) {
	  Configuration conf = new Configuration();
	  conf.set("fs.default.name",  "hdfs://mycluster");
	  try {
		  FileSystem dfs = FileSystem.get(conf);
	  
		  System.out.println("Home Path : " + dfs.getHomeDirectory());
		  System.out.println("Work Path : " + dfs.getWorkingDirectory());
		  
		  Path dir = new Path (ff);
		  if(dfs.exists(dir)) {
			  dfs.delete(dir, true);
		  }
	  } catch (Exception e) {
		  System.out.println("delete dir error ==>"+e);
	  }
  }
  
  // HDFS상의 폴더를 생성하는 함수
  static void mkdir_dir() {
	  Configuration conf = new Configuration();
	  conf.set("fs.default.name", "hdfs://mycluster");
	  try {
		  FileSystem dfs = FileSystem.get(conf);
		  
		  Path dir = new Path ("formatted-out");
		  if( ! dfs.exists(dir)) {
			  dfs.mkdirs(dir);
		  }
	  } catch (Exception e) {
		  System.out.println("mk dir error ==>"+e);
	  }
  }
  
  
  
}


// json데이타를 Person1형의 iterator로 변환하는 클래스
class ParseJson implements FlatMapFunction<Iterator<String>, Person1> {
	public Iterator<Person1> call(Iterator<String> lines) throws Exception {
		ArrayList<Person1> people = new ArrayList<Person1>();
		
		Gson mapper = new Gson();
		while (lines.hasNext()) {
			String line = lines.next();
			try {
				System.out.println("line => "+line);
				Person1 person1 = mapper.fromJson(line, Person1.class);
				System.out.println("person1=>"+person1);
				people.add(person1);
			} catch (Exception e) {
				// 무시함
			}
		}
		return people.iterator();
	}
}

// Person1형의 데이타를 String형의 iterator로 변환하는 클래스
class WriteJson implements FlatMapFunction<Iterator<Person1>, String> {
	public Iterator<String> call(Iterator<Person1> people) throws Exception {
		ArrayList<String> text = new ArrayList<String>();
		while(people.hasNext()) {
			Person1 person = people.next();
			text.add("new string =>"+person.toString());
		}
		return text.iterator();
	}
}

class Person1 implements Serializable {
	String name;
	int age;
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public int getAge() {
		return age;
	}
	public void setAge(int age) {
		this.age = age;
	}
	@Override
	public String toString() {
		return "Person1 [name=" + name + ", age=" + age + "]";
	}
	
}

// 평균값을 구하는 클래스 1
class AvgCount implements Serializable {
	private static final long serialVersionUID = 134444L;
	public AvgCount (int total, int num) {
		this.total = total;
		this.num = num;
	}
	
	public int total;
	public int num;
	public double avg() {
		return total / (double) num;
	}
}


//평균값을 구하는 클래스 2
class AvgCount2 implements Serializable {
	private static final long serialVersionUID = -1683922668212126392L;
	public AvgCount2(int total, int num) { total_= total; num_=num;}
	
	public int total_;
	public int num_;
	public float avg() {
		return total_ / (float) num_;
	}
}


번호 제목 글쓴이 날짜 조회 수
440 VisualVM 1.3.9을 이용한 JVM 모니터링 file 총관리자 2016.10.27 332
439 Cloudera가 사용하는 서비스별 포트 총관리자 2018.03.29 326
438 Container killed by the ApplicationMaster. Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 TaskAttempt killed because it ran on unusable node 오류시 조치방법 총관리자 2017.04.06 324
437 [JSON 파싱]mongodb의 document를 GSON을 이용하여 parsing할때 ObjectId값에서 오류 발생시 조치방법 총관리자 2017.01.18 323
436 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable원인 총관리자 2015.04.27 322
435 cloudera-scm-agent 설정파일 위치및 재시작 명령문 총관리자 2018.03.29 321
434 [tomcat] logrotate를 이용하여 catalina.out로그파일 일별로 로테이션 저장하기 file 총관리자 2017.01.18 318
433 [Kudu] tablet server 혹은 kudu master가 어떤 원인에 의해서 replica가 failed상태인 경우 복구하는 방법 총관리자 2021.05.24 314
432 editLog의 문제로 발생하는 journalnode 기동 오류 발생시 조치사항 총관리자 2017.09.14 313
431 [Oozie]Disk I/O error: Failed to open HDFS file dhfs://..../tb_aaa/....OPYING 총관리자 2019.02.15 311
430 Cloudera설치중 실패로 여러번 설치하는 과정에 "Running in non-interactive mode, and data appears to exist in Storage Directory /dfs/nn. Not formatting." 오류가 발생시 조치하는 방법 총관리자 2018.03.29 309
429 cassandra cluster 문제가 있는 node제거 하기(DN상태의 노드가 있으면 cassandra cluster 전체에 문제가 발생하므로 반드시 제거할것) 총관리자 2017.06.21 309
428 Runtime.getRuntime().exec(cmd) sample 소스 총관리자 2015.11.19 305
427 spark-submit으로 spark application실행하는 다양한 방법 총관리자 2016.05.25 303
426 Spark에서 KafkaUtils.createStream()를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 예제 소스 총관리자 2017.04.26 292
425 kafka 0.9.0.1버젼의 producer와 kafka버젼이 0.10.0.1인 consumer가 서로 대화하는 모습 총관리자 2016.08.18 288
424 룰에 매칭되면 발생되는 엑티베이션 객체에 대한 작업(이전값 혹은 현재값)을 처리하는 클래스 파일 총관리자 2016.07.21 285
423 System Properties Comparison Elasticsearch vs. Hive vs. Jena file 총관리자 2016.03.10 285
422 Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging 발생시 조치사항 총관리자 2017.04.19 284
421 impala2를 Cloudera Manager가 아닌 수동으로 설치하는 방법 총관리자 2018.05.30 281

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로