메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


console창을 두개 띄우고 한쪽에는 아래의 소스를 실행

(예, $HOME/spark/bin/spark-submit
--master spark://sda1:7077,sda2:7077
--driver-memory 2g
--executor-memory 3g
--class com.gooper.icbms.sda.kafka.onem2m.JavaSparkTest
sda-client-2.0.jar)


시키고 다른 쪽에는 nc -l 7777을 실행하고 문자열을 입력하여 7777포트에 stream을 발생시켜준다.


import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

import com.google.gson.Gson;

public final class JavaStreamingContextTest {

  public static void main(String[] args) throws Exception {
	  
	System.out.println("start(JavaStreamingContextTest)................");

    System.out.println("=========== test21 start =================================");
    test21();
    System.out.println("=========== test21 end =================================");
    

    
    System.out.println("end(JavaStreamingContextTest)................");
  }

  
// localhost:7777에서 들어오는 stream data에서 입력된 문자열을 기준으로 동일 문자열의 개수를 카운트한다.
static void test21()  {
	  SparkConf sc=new SparkConf().setAppName("JavaStreamingContextTest");
	  JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(1));
	  jssc.checkpoint("/tmp");
	  JavaDStream<String> lines = jssc.socketTextStream("sda1", 7777);
	  	  
	  // error가 있으면 출력
	  JavaDStream<String> errorLines  = lines.filter(new Function<String, Boolean>() {
		  public Boolean call(String line) {
			  return line.contains("error");
		  }
	  });
	  errorLines.print();
	  
	  // 문자카운트
	  JavaPairDStream<String, Integer> rst = lines.mapToPair(
			  new PairFunction<String, String, Integer>() {
				  public Tuple2<String, Integer> call(String line) {
					  return new Tuple2(line, 1);
				  }
			  }).updateStateByKey(new UpdateRunningSum());

	  // 람다식으로 처리할 경우
	  //JavaPairDStream<String, Integer> rst = lines.mapToPair( (line)->new Tuple2<String, Integer>(line, 1)).updateStateByKey(new UpdateRunningSum());

	  rst.print();
	  
	  jssc.start();
	  try { 
		  jssc.awaitTermination();
	  } catch (Exception e) {
		  System.out.println("exception 2: "+e.getMessage());
	  }
}

}
	
class UpdateRunningSum implements Function2<List<Integer>, Optional<Integer>, Optional<Integer>> {
	public Optional<Integer> call(List<Integer> values, Optional<Integer> current) {
		int newSum = current.orElse(0);
		for(int value : values) {
			newSum += value;
		}
		return Optional.of(newSum);
	}
};


번호 제목 글쓴이 날짜 조회 수
421 github에 있는 프로젝트와 로컬에서 작업한 프로젝트 합치기 총관리자 2016.11.22 40
420 원격의 origin/master를 기준으로 dev branch를 만들어 작업후 원격의 origin/dev에 push하는 방법 file 총관리자 2016.11.22 44
419 Mountable HDFS on CentOS 6.x(hadoop 2.7.2의 nfs기능을 이용) 총관리자 2016.11.24 174
418 S2RDF를 이용한 다른 버젼의 github링크 총관리자 2016.12.02 51
417 Jena 2.3를 Hadoop 2.7.2의 NFS로 mount하고 fuseki를 이용하여 start할때 오류 메세지 총관리자 2016.12.02 1557
416 jena의 data폴더를 hadoop nfs를 이용하여 HDFS상의 폴더에 마운트 시키고 fuseki를 통하여 inert를 시도했을때 transaction 오류 발생 총관리자 2016.12.02 82
415 hbase startrow와 endrow를 지정하여 검색하기 샘플 총관리자 2016.12.07 70
414 ResultSet에서 데이타를 List<Map<String,String>>형태로 만들어서 리턴하는 소스(Collections.sort를 이용한 정렬 가능) 총관리자 2016.12.15 243
413 Collections.sort를 이용한 List<User>형태의 데이타 정렬(숫자, 문자에 대해서 각각 asc/desc및 복합정렬) 총관리자 2016.12.15 114
412 Collections.sort를 이용한 List<Map<String, String>>형태의 데이타 정렬 소스 총관리자 2016.12.15 45
411 mongodb aggregation query를 Java code로 변환한 샘플 총관리자 2016.12.15 777
410 MongoDB에 있는 특정컬럼의 값을 casting(string->integer)하여 update하기 java 소스 총관리자 2016.12.19 88
409 like검색한 결과를 기준으로 집계를 수행하는 java 소스 총관리자 2016.12.19 129
408 Class.forName을 이용한 메서드 호출 샘플소스 총관리자 2016.12.21 100
407 new Gson().toJson(new ObjectId())을 사용하면 값이 다르게 나오는 경우가 있음 총관리자 2016.12.23 44
406 List<Map<String, String>>형태의 데이타에서 중복제거 하는 방법 총관리자 2016.12.23 1695
405 Halyard - RDF4J와 Apache HBase를 이용하여 구현된 TripleStore이며 SPARQL 1.1쿼리를 지원한다. 총관리자 2016.12.29 630
404 eclipse 3.1 단축키 정리파일 총관리자 2017.01.02 38
403 [MemoryLeak분석]다수의 MongoCleaner 쓰레드가 Sleep상태에 있으면서 Full GC가 계속 발생되는 문제 해결방법 file 총관리자 2017.01.11 263
402 spark 2.0.0를 windows에서 실행시 로컬 파일을 읽을때 발생하는 오류 해결 방법 총관리자 2017.01.12 106

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로