메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


spark 테스트 프로그램 몇개

package com.gooper.test;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;

import scala.Tuple2;

import com.google.gson.Gson;

public final class JavaSparkPi {
	
	static final SparkConf sparkConf=new SparkConf().setAppName("JavaSparkPi");
	static {
		sparkConf.setMaster("spark://gsda1:7077,gsda2:7077");
		sparkConf.setMaster("local[*]");

	}
    
	static final JavaSparkContext jsc=new JavaSparkContext(sparkConf);
	static  final Broadcast<List<String>> temp = jsc.broadcast(Arrays.asList("hello world", "", "hi park", "", "sss ff"));

  public static void main(String[] args) throws Exception {
	  
	System.out.println("start................");

	// PI값 구하기
    int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
    int n = 100000 * slices;
    List<Integer> l = new ArrayList<Integer>(n);
    for (int i = 0; i < n; i++) {
      l.add(i);
    }

    JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
    
    int count = dataSet.map((z) ->  {
        double x = Math.random() * 2 - 1;
        double y = Math.random() * 2 - 1;
        if(x * x + y * y <= 1)  return 1; else return 0;
    }).reduce((s, s2) -> (s + s2));

    System.out.println("Pi is roughly " + 4.0 * count / n);

    
    System.out.println("=========== test start =================================");
    test(jsc);
    System.out.println("=========== test end =================================");
    
    System.out.println("=========== test2 start =================================");
    test2(jsc);
    System.out.println("=========== test2 end =================================");
    
    System.out.println("=========== test3 start =================================");
    test3(jsc);
    System.out.println("=========== test3 end =================================");

    System.out.println("=========== test4 start =================================");
    test4(jsc);
    System.out.println("=========== test4 end =================================");

    System.out.println("=========== test5 start =================================");
    test5(jsc);
    System.out.println("=========== test5 end =================================");

    System.out.println("=========== test6 start =================================");
    test6(jsc);
    System.out.println("=========== test6 end =================================");

    System.out.println("=========== test7 start =================================");
    test7(jsc);
    System.out.println("=========== test7 end =================================");

    System.out.println("=========== test8 start =================================");
    test8(jsc);
    System.out.println("=========== test8 end ================================="); 

    System.out.println("=========== test9 start =================================");
    test9(jsc);
    System.out.println("=========== test9 end =================================");

    System.out.println("=========== test10 start =================================");
    test10(jsc);
    System.out.println("=========== test10 end =================================");

    System.out.println("=========== test11 start =================================");
    test11(jsc);
    System.out.println("=========== test11 end =================================");

    System.out.println("=========== test12 start =================================");
    test12(jsc);
    System.out.println("=========== test12 end =================================");

    System.out.println("=========== test13 start =================================");
    test13(jsc);
    System.out.println("=========== test13 end =================================");

    System.out.println("=========== test14 start =================================");
    test14(jsc);
    System.out.println("=========== test14 end =================================");

    System.out.println("=========== test15 start =================================");
    test15();
    System.out.println("=========== test15 end =================================");

    System.out.println("=========== test16 start =================================");
    test16(jsc);
    System.out.println("=========== test16 end =================================");

    System.out.println("=========== test17 start =================================");
    test17(jsc);
    System.out.println("=========== test17 end =================================");

    System.out.println("=========== test18 start =================================");
    test18(jsc);
    System.out.println("=========== test18 end =================================");

    System.out.println("=========== test19 start =================================");
    test19(jsc);
    System.out.println("=========== test19 end =================================");
    
    jsc.stop();
    jsc.close();
    
    System.out.println("end................");
  }
  
  // List를 RDD로 변환하고 map연상을 통해서 x*x한 값으로 구성된 JavaRDD를 만들어서 화면에 출력 
  static void test (JavaSparkContext sc) {
	  JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));
	  JavaRDD<Integer> result = rdd.map((x) -> {return x*x;});
	  
	  System.out.println("result ==>"+StringUtils.join(result.collect(), ","));
  }
  
  // 문자열을 space로 분리(람다식을 이용함)하여 화면에 출력
  static void test2 (JavaSparkContext sc) {
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "hi"));
	  JavaRDD<String> words = lines.flatMap( (line) -> {return Arrays.asList(line.split(" ")).iterator();});
	  
	  System.out.println("result ==>"+StringUtils.join(words.collect(), ","));
  }
  
  
  // RDD의 함수인 disticnt, union, intersection, subtract, cartesian, countByValue를 적용하여 화면에 결과 출력 
  static void test3 (JavaSparkContext sc) {
	  JavaRDD<String> data1 = sc.parallelize(Arrays.asList("coffee", "coffee", "panda", "monkey", "tea"));
	  JavaRDD<String> data2 = sc.parallelize(Arrays.asList("coffee", "monkey", "kitty"));
	  JavaRDD<Integer> data3 = sc.parallelize(Arrays.asList(1,2,3));
	  
	  System.out.println("distinct ==>"+data1.distinct().collect());
	  System.out.println("union ==>"+data1.union(data2).collect());
	  System.out.println("intersection ==>"+data1.intersection(data2).collect());
	  System.out.println("subtract ==>"+data1.subtract(data2).collect());
	  System.out.println("cartesion ==>"+data1.cartesian(data3).collect());
	  System.out.println("countByValue ==>"+data1.countByValue());
  }
  
  
  // persist를 사용하여 reduce, fold등를 적용해보고, double형의 RDD로 변환하여 mean값을 구하여 출력함
  static void test4(JavaSparkContext sc) {
	  JavaRDD<Integer> data1 = sc.parallelize(Arrays.asList(1,2,3,4));
	  JavaRDD<Integer> data2 = sc.parallelize(Arrays.asList(3,4,5));
	  
	  List<Integer> data3 = new ArrayList<Integer>();
	  data3.add(1);
	  data3.add(2);
	  data3.add(3);
	  data3.add(4);
	  
	  JavaRDD<Integer> map = data1.map(x -> x+1);
	  map.persist(StorageLevel.MEMORY_AND_DISK());
	  
	  Function2<Integer, Integer, Integer> reduce = new Function2<Integer, Integer, Integer>() {
		 public Integer call(Integer x, Integer y) {
			 return x+y;
		 }
	  };
	  
	  DoubleFunction<Integer> df = new DoubleFunction<Integer>() {
		  public double call(Integer x) {
			  return (double) x;
		  }
	  };
	  
	  System.out.println("map==>"+data1.map(x -> x+1).reduce((x, y)->{return x+y;}));
	  
	  System.out.println("fold==>"+map.fold(0, reduce));
	  
	  map.foreach((x)->System.out.println(x));
	  
	  JavaDoubleRDD result = data1.mapToDouble((x) -> x);
	  System.out.println("mean ===>"+result.mean());
	  result.foreach((x) -> System.out.println(x));
	  
	  System.out.println("--------------------------");
	  
	  JavaDoubleRDD result2 = map.mapToDouble(df);
  	  System.out.println("mean by DoubleFuntion()===>"+result2.mean());
	  result2.foreach((x) -> System.out.println(x)); 

  }
  
  
  // 숫자형의 JavaRDD를 이용하여 aggregate함수를 사용하는 예제
  static void test5(JavaSparkContext sc) {
	  JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));
	  
	  Function2<AvgCount, Integer, AvgCount> addAndCount =
			  new Function2<AvgCount, Integer, AvgCount> (){
				private static final long serialVersionUID = 122222L;

				public AvgCount call(AvgCount a, Integer x) {
			  		a.total += x;
			  		a.num += 1;
			  		return a;
			  	}
			  };

	  Function2<AvgCount, AvgCount, AvgCount> combine =
	  new Function2<AvgCount, AvgCount, AvgCount> (){
		private static final long serialVersionUID = 11111L;

		public AvgCount call(AvgCount a, AvgCount b) {
	  		a.total += b.total;
	  		a.num += b.num;
	  		return a;
	  	}
	  };

	  AvgCount initial = new AvgCount(0, 0);
	  AvgCount result = rdd.aggregate(initial, addAndCount, combine);
      System.out.println(result.avg());
  }
  
  
  // 문자열 RDD를 key/value형태로 바꾸고 key를 기준을 작동하는 sortByKey, reduceByKey, groupByKey, sortByKey를 적용해보는 예제
  static void test6(JavaSparkContext sc) {
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "hi", "hi park", "dk"));
	  
	PairFunction<String, String, Integer> keyData = 
		new PairFunction<String, String, Integer>() {
		public Tuple2<String, Integer> call(String x) {
			return new Tuple2(x.split(" ")[0], x.length());
		}
	};
	
	JavaPairRDD<String, Integer> pairs = lines.mapToPair(keyData);
	pairs.foreach(x->System.out.println(x));
	
	JavaPairRDD<String, Integer> reduceByKey = pairs.reduceByKey(   (x, y) -> { return (x+y);} );
	JavaPairRDD<String, Iterable<Integer>> groupByKey = pairs.groupByKey();

	JavaPairRDD<String, Integer> sortByKey = pairs.sortByKey(false);
	
	System.out.println("reduceByKey =>"+reduceByKey.collect() );
	
	System.out.println("groupByKey =>"+groupByKey.collect() );
	
	System.out.println("sortBykey => "+sortByKey.collect() );
  }

  //   Tuple2형의 RDD를 이용하여 mapToPair를 적용하여 JavaPairRDD를 만들면서 각각의 RDD를 출력해보는 예제
  //  두개의 JavaPairRDD를 이용하여 subtractByKey, subtract, rightOuterJoin, leftOuterJoin, cogroup를 적용해보는 예제 
  static void test7(JavaSparkContext sc) {
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  List<Tuple2<String, Integer>> data2 = new ArrayList<Tuple2<String, Integer>>();
	  
	  data1.add(new Tuple2("a",2));
	  data1.add(new Tuple2("c",4));
	  data1.add(new Tuple2("c",6));
	  
	  data2.add(new Tuple2("c",4));
	  
	  JavaRDD<Tuple2<String, Integer>> pdataa1 = sc.parallelize(data1);
	  
	  JavaPairRDD<String, Integer> pdataa11 = pdataa1.mapToPair(  (x) -> {return new Tuple2(x._1, x._2);}  );
	  
	  System.out.println("pdataa1 ==>"+pdataa1);
	  System.out.println("pdataa11 ==>"+pdataa11);
	  
	  System.out.println("pdataa1 ==>"+pdataa1.collect());
	  System.out.println("pdataa11 ==>"+pdataa11.collect());
	  
	  JavaPairRDD<String, Integer> pdata1 = sc.parallelizePairs(data1);
	  JavaPairRDD<String, Integer> pdata2 = sc.parallelizePairs(data2);
	  
	  System.out.println("pdata1 ==>"+pdata1.collect());
	  System.out.println("pdata2 ==>"+pdata2.collect());

	System.out.println("subtractByKey =>"+pdataa11.subtractByKey(pdata2).collect());
	System.out.println("subtract =>"+pdata1.subtract(pdata2).collect());
	
	System.out.println("join =>"+pdata1.join(pdata2).collect());
	System.out.println("rightOuterJoin =>"+pdata1.rightOuterJoin(pdata2).collect());
	System.out.println("leftOuterJoin =>"+pdata1.leftOuterJoin(pdata2).collect());
	
	System.out.println("cogroup =>"+pdata1.cogroup(pdata2).collect());

	Function<Integer, Integer> ff = new Function<Integer, Integer>() {
		private static final long serialVersionUID = 11234L;
		int sum = 100;
		public Integer call (Integer x) {
			sum += x;
			return sum;
		}
	};
	
	System.out.println("mapValues =>" + pdata1.mapValues(ff).collect()); 
	
	//System.out.println("mapValues 1=>" + pdata1.reduce( (x2._2) -> {return x2._2;})           );
	System.out.println("reduce =>" + pdata1.reduce( (x2, y2) -> {return new Tuple2("sum of all elements", (x2._2+ y2._2) );})           );
  }
  
  // String형의 List를 JavaRDD를 만들고 이를 JavaPairRDD로 변환후 reduceByKey를 적용하는 예제
  static void test8(JavaSparkContext sc) {
	  List<String> data1 = new ArrayList<String>();
	  data1.add("ab");
	  data1.add("abcd");
	  data1.add("ab");
	  data1.add("cd");
	  
	  JavaRDD<String> pdata = sc.parallelize(data1);
	  
	  JavaPairRDD<String, Integer> pdata1 = pdata.mapToPair(x-> {return new Tuple2(x, 1);});
	  System.out.println("mapToPair==>"+pdata1.collect());
	  
	  JavaPairRDD<String, Integer> pdata2 = pdata1.reduceByKey( (x, y)-> { return (x+y); } );
	  System.out.println("reduceByKey==>"+pdata2.collect() );

  }
  
  //   sc.parallelizePairs를 이용하여  JavaPairRDD를 만들고 combineByKey 적용하고 Map으로 collect후에 출력하는 예제
  static void test9(JavaSparkContext sc) {
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  data1.add(new Tuple2("a", 1));
	  data1.add(new Tuple2("b", 1));
	  data1.add(new Tuple2("a", 10));
	  data1.add(new Tuple2("c", 9));
	  
	  JavaPairRDD<String, Integer> pdata = sc.parallelizePairs(data1);
	  
	  Function<Integer, AvgCount2> createCombiner = new Function<Integer, AvgCount2>() {
			public AvgCount2 call(Integer x) {
				return new AvgCount2(x, 1);
			}
		};

		Function2<AvgCount2, Integer, AvgCount2> mergeValue =
			new Function2<AvgCount2, Integer, AvgCount2>() {
			public AvgCount2 call(AvgCount2 a, Integer x) {
				a.total_ += x;
				a.num_ += 1;
				return a;
			}
		};

		Function2<AvgCount2, AvgCount2, AvgCount2> mergeCombiner =
			new Function2<AvgCount2, AvgCount2, AvgCount2>() {
			public AvgCount2 call(AvgCount2 a, AvgCount2 b) {
				a.total_ += b.total_;
				a.num_ += b.num_;
				return a;
			}
		};
	  
	  JavaPairRDD<String, AvgCount2> avgCounts =  pdata.combineByKey(createCombiner, mergeValue, mergeCombiner);
	  Map<String, AvgCount2> countMap = avgCounts.collectAsMap();

	  for(Entry<String, AvgCount2> entry : countMap.entrySet()) {
	  	System.out.println(entry.getKey() + ":" + entry.getValue().avg());
	  }
  
  }
  
  // 두개의 Tuple2형 List를 이용하여 sc.paralledizePairs를 이용하여 JavaPairRDD로 변환하고 leftOuterJoin를 적용하는 예제
  static void test10(JavaSparkContext sc) {
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  data1.add(new Tuple2("x", 10));	  
	  data1.add(new Tuple2("a", 1));
	  data1.add(new Tuple2("b", 1));

	  
	  
	  List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
	  data2.add(new Tuple2("a", "aa"));
	  data2.add(new Tuple2("b", "bb"));
	  
	  JavaPairRDD<String, Integer> pdata1 = sc.parallelizePairs(data1);
	  JavaPairRDD<String, String> pdata2 = sc.parallelizePairs(data2);
	  
	  pdata1.sortByKey(true);
	  
	 JavaPairRDD<String, Tuple2<Integer,Optional<String>>> result = pdata1.leftOuterJoin(pdata2);

	 System.out.println("pdata1==>"+pdata1.collect());
	 System.out.println("pdata2==>"+pdata2.collect());
	 
	 System.out.println("result==>"+result.collect());
  
  }
  
  // test.json파일을 일거서 메모리에 적재하고 데이타를 파싱하여 Person1 객체에 담아 partition별로 map작업을 수행고 HDFS에 저장하는 예제  
  static void test11(JavaSparkContext sc) {
	  String dir = "formatted-out";
	  
   	 JavaRDD<String> input = sc.textFile("file:///tmp/test.json", 5).persist(StorageLevel.MEMORY_ONLY_2());
   	 
   	 JavaRDD<Person1> result = input.mapPartitions(new ParseJson());
   	 
   	 System.out.println("persons from json ===>"+result.collect());
   	 
   	 JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
   	 
   	 delete_dir(dir);
   	 //mkdir_dir();
   	
   	 formatted.saveAsTextFile(dir);
  
  }
  
  // Tuple2형의 List를 key/value형의 JavaPairRDD로 변환하고 ConvertToWritableTypes 객체를 이용하여 IntWritable형으로 변환하여
  // SequenceFileOutputFormat으로 저장하는 예제
  static void test12(JavaSparkContext sc) {
	  String dir = "sequence-write";
	  
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  
	  data1.add(new Tuple2("a",2));
	  data1.add(new Tuple2("c",4));
	  data1.add(new Tuple2("c",6));
	  
	  JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(data1);
	  
	  JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes());
	  
	  delete_dir(dir);
	  
	  System.out.println("Native Values before ==>"+data1.toString());
	   	 
	  result.saveAsNewAPIHadoopFile(dir, Text.class, IntWritable.class,	SequenceFileOutputFormat.class);
	  System.out.println("Saved as SequenceFileOutputFormat.class");
  }
  
  // Writable형의 저장되어 있는 sequence format의 data를 읽어서 원래의 값으로 변환하는 예제
  static void test13(JavaSparkContext sc) {
	  String fileName = "sequence-write";
	  JavaPairRDD<Text, IntWritable> input = sc.sequenceFile(fileName, Text.class, IntWritable.class);
	  JavaPairRDD<String, Integer> result = input.mapToPair(new ConvertToNativeTypes());
	  
	  System.out.println("Native Values after ====>"+result.collect());
  }
  
  
  // HIveContext를 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0기준으로 deprecated됨)
  static void test14(JavaSparkContext sc) {
	  HiveContext ctx = new HiveContext(sc);
	  
	  Dataset<Row>  rows = ctx.sql("select * from default.test_table");
  
	  Dataset<String> stringsDS = rows.map(new MapFunction<Row, String>() {
		  @Override
		  public String call(Row row) throws Exception {
			  //return "Key : "+row.get(0) + ", Value : "+row.get(1);
			  return "Value : "+row.get(0);
		  }
	  }, Encoders.STRING());
	  
	  System.out.println("select result #1 =>"+stringsDS.toJavaRDD().collect());
  }
  
  // SparkSession을 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0이상에서는 이것만 유효함)
  static void test15() {
	  SparkSession session = new SparkSession.Builder().appName("SparkJoinExample").master("local").enableHiveSupport().getOrCreate();
      Dataset<Row> dset = session.sql("select * from default.test_table");
      
	  System.out.println("select result #2 =>"+dset.toJavaRDD().collect());
  }

  // SQLContext를 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0기준으로 deprecated됨)
  static void test16(JavaSparkContext sc) {
	  SQLContext ctx = new SQLContext(sc);
	  Dataset<Row>  rows = ctx.sql("select * from default.test_table");
  
	  Dataset<String> stringsDS = rows.map(new MapFunction<Row, String>() {
		  @Override
		  public String call(Row row) throws Exception {
			  //return "Key : "+row.get(0) + ", Value : "+row.get(1);
			  return "Value : "+row.get(0);
		  }
	  }, Encoders.STRING());
	  
	  System.out.println("select result #3 =>"+stringsDS.toJavaRDD().collect());
  }
  
  // accumulator 변수를 사용하는 예제
  static void test17(JavaSparkContext sc) {
	  final Accumulator<Integer> blankLines = sc.accumulator(0);
	  
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "", "hi park", "", "sss ff"));
	  
	  JavaRDD<String> callSigns = lines.flatMap(
			  new FlatMapFunction<String, String>() {
				  public Iterator<String> call(String line) {
					  if(line.equals("")) {
						  blankLines.add(1);
					  }
					 return Arrays.asList(line.split(" ")).iterator();
				  }
				});
				
	  			delete_dir("output.txt");
	  
				callSigns.saveAsTextFile("output.txt");
				System.out.println("Blank lines: "+blankLines.value());
  }
  
  // accumulator와 broadcast변수를 동시에 사용하는 예제
  static void test18(JavaSparkContext sc) {

      final Accumulator<Integer> blankLines = sc.accumulator(0);
	  
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "", "hi park", "", "sss ff"));
	  
	  JavaRDD<String> callSigns = lines.flatMap(
			  new FlatMapFunction<String, String>() {
				  public Iterator<String> call(String line) {
					  if(line.equals("")) {
						  blankLines.add(1);
					  }
					  System.out.println("str in broadcasted  ==>"+temp.value());
					 return Arrays.asList(line.split(" ")).iterator();
				  }
				});
	  
	  			delete_dir("output.txt2");
				
				callSigns.saveAsTextFile("output.txt2");
				System.out.println("Blank lines: "+blankLines.value());
  }
  
  // 문자형 List를 JavaRDD로 변환하고 mapToDobule를 이용하여 double형으로 변환하고 집계함수인 sum, mean, variance, stdev등을 적용해보는 예제
  static void test19(JavaSparkContext sc) {
	  JavaRDD<String> age = sc.parallelize(Arrays.asList("1","2","3","4","5"));
	  JavaDoubleRDD doubleAge = age.mapToDouble(new DoubleFunction<String>() {
		  public double call(String value) {
			  return Double.parseDouble(value);
		  }});
	  
	  System.out.println("sum = "+doubleAge.sum());
	  System.out.println("mean ="+doubleAge.mean());
	  System.out.println("variance ="+doubleAge.variance());
	  System.out.println("stdev ="+doubleAge.stdev());
  }
 
  // Tuple2<Text, IntWritable>값을 받아서 String형 key와 Integer형 value로 변환하는 클래스
  static class ConvertToNativeTypes implements PairFunction<Tuple2<Text, IntWritable>, String, Integer> {
	  public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) {
		  return new Tuple2(record._1.toString(), record._2.get());
	  }
  }
  
  // String형의 key와 Integer형의 Tuple2를 받아서 Text형의 key와 IntWritable형의 Tuple2값으로 변환하는 클래스  
  static class ConvertToWritableTypes implements PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
	  public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
		  return new Tuple2(new Text(record._1), new IntWritable(record._2));
	  }
  }
  
  // HDFS상의 폴더밑 하위 파일을 지우는 함수
  static void delete_dir(String ff) {
	  Configuration conf = new Configuration();
	  conf.set("fs.default.name",  "hdfs://mycluster");
	  try {
		  FileSystem dfs = FileSystem.get(conf);
	  
		  System.out.println("Home Path : " + dfs.getHomeDirectory());
		  System.out.println("Work Path : " + dfs.getWorkingDirectory());
		  
		  Path dir = new Path (ff);
		  if(dfs.exists(dir)) {
			  dfs.delete(dir, true);
		  }
	  } catch (Exception e) {
		  System.out.println("delete dir error ==>"+e);
	  }
  }
  
  // HDFS상의 폴더를 생성하는 함수
  static void mkdir_dir() {
	  Configuration conf = new Configuration();
	  conf.set("fs.default.name", "hdfs://mycluster");
	  try {
		  FileSystem dfs = FileSystem.get(conf);
		  
		  Path dir = new Path ("formatted-out");
		  if( ! dfs.exists(dir)) {
			  dfs.mkdirs(dir);
		  }
	  } catch (Exception e) {
		  System.out.println("mk dir error ==>"+e);
	  }
  }
  
  
  
}


// json데이타를 Person1형의 iterator로 변환하는 클래스
class ParseJson implements FlatMapFunction<Iterator<String>, Person1> {
	public Iterator<Person1> call(Iterator<String> lines) throws Exception {
		ArrayList<Person1> people = new ArrayList<Person1>();
		
		Gson mapper = new Gson();
		while (lines.hasNext()) {
			String line = lines.next();
			try {
				System.out.println("line => "+line);
				Person1 person1 = mapper.fromJson(line, Person1.class);
				System.out.println("person1=>"+person1);
				people.add(person1);
			} catch (Exception e) {
				// 무시함
			}
		}
		return people.iterator();
	}
}

// Person1형의 데이타를 String형의 iterator로 변환하는 클래스
class WriteJson implements FlatMapFunction<Iterator<Person1>, String> {
	public Iterator<String> call(Iterator<Person1> people) throws Exception {
		ArrayList<String> text = new ArrayList<String>();
		while(people.hasNext()) {
			Person1 person = people.next();
			text.add("new string =>"+person.toString());
		}
		return text.iterator();
	}
}

class Person1 implements Serializable {
	String name;
	int age;
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public int getAge() {
		return age;
	}
	public void setAge(int age) {
		this.age = age;
	}
	@Override
	public String toString() {
		return "Person1 [name=" + name + ", age=" + age + "]";
	}
	
}

// 평균값을 구하는 클래스 1
class AvgCount implements Serializable {
	private static final long serialVersionUID = 134444L;
	public AvgCount (int total, int num) {
		this.total = total;
		this.num = num;
	}
	
	public int total;
	public int num;
	public double avg() {
		return total / (double) num;
	}
}


//평균값을 구하는 클래스 2
class AvgCount2 implements Serializable {
	private static final long serialVersionUID = -1683922668212126392L;
	public AvgCount2(int total, int num) { total_= total; num_=num;}
	
	public int total_;
	public int num_;
	public float avg() {
		return total_ / (float) num_;
	}
}


번호 제목 글쓴이 날짜 조회 수
419 lagom을 이용한 샘플 경매 프로그램 실행방법 총관리자 2017.06.20 181
418 원격 리포지토리에서 최초 clone시 Permission denied (publickey). 오류발생시 조치사항 총관리자 2017.06.20 871
417 .git폴더를 삭제하고 다시 git에 추가하고 서버에 반영하는 방법 총관리자 2017.06.19 4075
416 [Dovecot] -ERR [SYS/PERM] Permission denied 총관리자 2017.06.13 237
415 숭실대 교수님등 강의영상(바이오데이터마이닝, 빅데이터분산컴퓨팅, 컴퓨터 그래픽스, 데이터베이스응용및 프로그램밍, 데이터베이스, 의생명영상처리, 웹그로그래밍, 데이터마이닝, 컴퓨터구조) file 총관리자 2017.06.13 205
414 시맨틱 관련 논문 모음 사이트 총관리자 2017.06.13 94
413 [dovecot]dovecot restart할때 root@gsda4:/usr/lib/dovecot# service dovecot restart 오류 발생시 조치사항 총관리자 2017.06.12 492
412 sendmail + dovecot(pop3) + saslauthd 설치 총관리자 2017.06.11 184
411 sendmail전송시 421 4.3.0 collect: Cannot write ./dfv5BA2EBS010579 (bfcommit, uid=0, gid=114): No such file or directory 발생시 조치사항 총관리자 2017.06.11 694
410 Eclipse 에서 bitbucket.org 연동 하기 file 총관리자 2017.06.08 269
409 [u-Auctions]목록이 1개만 나오는 문제 총관리자 2017.05.29 38
408 Ubuntu 16.04 LTS에서 sendmail설치및 설정(수신,발신 가능)및 메일서버 만들기 총관리자 2017.05.23 1112
407 Ubuntu 16.04LTS 설치후 초기에 주어야 하는 작업(php, apache, mariadb설치및 OS보안설정등) file 총관리자 2017.05.23 5268
406 Ubuntu 16.04 LTS에서 사이트에 무료인증서를 이용하여 SSL적용 file 총관리자 2017.05.23 353
405 fuseki용 config-examples.ttl 예시 내용 총관리자 2017.05.17 646
404 webid에서 google처럼 검색할 수 있도록 하는 프로그램 총관리자 2017.05.16 46
403 mysql-server 기동시 Do you already have another mysqld server running on port 오류 발생할때 확인및 조치방법 총관리자 2017.05.14 2664
402 php auction 프로그램 총관리자 2017.05.14 94
401 [PHP7.0]로그파일 위치 총관리자 2017.05.07 151
400 mapreduce appliction을 실행시 "is running beyond virtual memory limits" 오류 발생시 조치사항 총관리자 2017.05.04 16895

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로