메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


console창을 두개 띄우고 한쪽에는 아래의 소스를 실행

(예, $HOME/spark/bin/spark-submit
--master spark://sda1:7077,sda2:7077
--driver-memory 2g
--executor-memory 3g
--class com.gooper.icbms.sda.kafka.onem2m.JavaSparkTest
sda-client-2.0.jar)


시키고 다른 쪽에는 nc -l 7777을 실행하고 문자열을 입력하여 7777포트에 stream을 발생시켜준다.


import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

import com.google.gson.Gson;

public final class JavaStreamingContextTest {

  public static void main(String[] args) throws Exception {
	  
	System.out.println("start(JavaStreamingContextTest)................");

    System.out.println("=========== test21 start =================================");
    test21();
    System.out.println("=========== test21 end =================================");
    

    
    System.out.println("end(JavaStreamingContextTest)................");
  }

  
// localhost:7777에서 들어오는 stream data에서 입력된 문자열을 기준으로 동일 문자열의 개수를 카운트한다.
static void test21()  {
	  SparkConf sc=new SparkConf().setAppName("JavaStreamingContextTest");
	  JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(1));
	  jssc.checkpoint("/tmp");
	  JavaDStream<String> lines = jssc.socketTextStream("sda1", 7777);
	  	  
	  // error가 있으면 출력
	  JavaDStream<String> errorLines  = lines.filter(new Function<String, Boolean>() {
		  public Boolean call(String line) {
			  return line.contains("error");
		  }
	  });
	  errorLines.print();
	  
	  // 문자카운트
	  JavaPairDStream<String, Integer> rst = lines.mapToPair(
			  new PairFunction<String, String, Integer>() {
				  public Tuple2<String, Integer> call(String line) {
					  return new Tuple2(line, 1);
				  }
			  }).updateStateByKey(new UpdateRunningSum());

	  // 람다식으로 처리할 경우
	  //JavaPairDStream<String, Integer> rst = lines.mapToPair( (line)->new Tuple2<String, Integer>(line, 1)).updateStateByKey(new UpdateRunningSum());

	  rst.print();
	  
	  jssc.start();
	  try { 
		  jssc.awaitTermination();
	  } catch (Exception e) {
		  System.out.println("exception 2: "+e.getMessage());
	  }
}

}
	
class UpdateRunningSum implements Function2<List<Integer>, Optional<Integer>, Optional<Integer>> {
	public Optional<Integer> call(List<Integer> values, Optional<Integer> current) {
		int newSum = current.orElse(0);
		for(int value : values) {
			newSum += value;
		}
		return Optional.of(newSum);
	}
};


번호 제목 글쓴이 날짜 조회 수
420 impala2를 Cloudera Manager가 아닌 수동으로 설치하는 방법 총관리자 2018.05.30 280
419 nc -l 7777 : 7777포트에서 입력을 받는다. 총관리자 2017.03.23 278
418 SPARQL의 유형, SPARQL 만들기등에 대한 설명 총관리자 2016.02.18 274
417 Eclipse 에서 bitbucket.org 연동 하기 file 총관리자 2017.06.08 272
416 solr 데몬이 떠있는 동안 hadoop이 다운되는 경우 Index dir 'hdfs://mycluster/user/../core_node2/data/index/' of core 'gc_shard1_replica2' is already locked라논 오류가 발생하는데 이에 대한 조치사항 총관리자 2018.01.04 268
415 hbase가 기동시키는 zookeeper에서 받아드리는 ip가 IPv6로 사용되는 경우가 있는데 이를 IPv4로 강제적용하는 방법 총관리자 2015.05.08 267
414 [sap] Error: java.io.IOException: SQLException in nextKeyValue 오류 발생 총관리자 2020.06.08 266
413 [MemoryLeak분석]다수의 MongoCleaner 쓰레드가 Sleep상태에 있으면서 Full GC가 계속 발생되는 문제 해결방법 file 총관리자 2017.01.11 262
412 hive metastore db중 TBLS, TABLE_PARAMS테이블 설명 총관리자 2021.10.22 259
411 Windows7 64bit 환경에서 Apache Spark 2.2.0 설치하기 총관리자 2017.07.26 259
410 kafka에서 메세지 중복 consume이 발생할 수 있는 상황 총관리자 2018.10.23 258
409 fuseki에 update하는 방법(java api이용)및 주의 사항 총관리자 2015.12.30 258
408 sparql 1.1 BIND(if() as ?bind변수) 버그로 추정되는 문제점및 해결방안 -> select 문에 (if(,,) as ?bind변수) file 총관리자 2016.01.21 257
407 Not enough replica available for query at consistency QUORUM가 발생하는 경우 총관리자 2017.06.21 256
406 DB별 JDBC 드라이버 총관리자 2015.10.02 256
405 ?a는 모두 표시하면서 ?b와 비교하여 ?a=?b는 표시하고 ?a!=?b 인경우는 ""로 구성된 결과 집합을 구하는 경우 file 총관리자 2016.01.29 255
404 HiveServer2인증을 PAM을 이용하도록 설정하는 방법 총관리자 2018.07.21 254
403 How-to: Build a Complex Event Processing App on Apache Spark and Drools file 총관리자 2016.10.31 253
402 우분투에서 패키지 설치시 E: Sub-process /usr/bin/dpkg returned an error code 발생시 조치 총관리자 2017.05.02 251
401 centos에 sbt 0.13.5 설치 총관리자 2016.05.30 251

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로