메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


console창을 두개 띄우고 한쪽에는 아래의 소스를 실행

(예, $HOME/spark/bin/spark-submit
--master spark://sda1:7077,sda2:7077
--driver-memory 2g
--executor-memory 3g
--class com.gooper.icbms.sda.kafka.onem2m.JavaSparkTest
sda-client-2.0.jar)


시키고 다른 쪽에는 nc -l 7777을 실행하고 문자열을 입력하여 7777포트에 stream을 발생시켜준다.


import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

import com.google.gson.Gson;

public final class JavaStreamingContextTest {

  public static void main(String[] args) throws Exception {
	  
	System.out.println("start(JavaStreamingContextTest)................");

    System.out.println("=========== test21 start =================================");
    test21();
    System.out.println("=========== test21 end =================================");
    

    
    System.out.println("end(JavaStreamingContextTest)................");
  }

  
// localhost:7777에서 들어오는 stream data에서 입력된 문자열을 기준으로 동일 문자열의 개수를 카운트한다.
static void test21()  {
	  SparkConf sc=new SparkConf().setAppName("JavaStreamingContextTest");
	  JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(1));
	  jssc.checkpoint("/tmp");
	  JavaDStream<String> lines = jssc.socketTextStream("sda1", 7777);
	  	  
	  // error가 있으면 출력
	  JavaDStream<String> errorLines  = lines.filter(new Function<String, Boolean>() {
		  public Boolean call(String line) {
			  return line.contains("error");
		  }
	  });
	  errorLines.print();
	  
	  // 문자카운트
	  JavaPairDStream<String, Integer> rst = lines.mapToPair(
			  new PairFunction<String, String, Integer>() {
				  public Tuple2<String, Integer> call(String line) {
					  return new Tuple2(line, 1);
				  }
			  }).updateStateByKey(new UpdateRunningSum());

	  // 람다식으로 처리할 경우
	  //JavaPairDStream<String, Integer> rst = lines.mapToPair( (line)->new Tuple2<String, Integer>(line, 1)).updateStateByKey(new UpdateRunningSum());

	  rst.print();
	  
	  jssc.start();
	  try { 
		  jssc.awaitTermination();
	  } catch (Exception e) {
		  System.out.println("exception 2: "+e.getMessage());
	  }
}

}
	
class UpdateRunningSum implements Function2<List<Integer>, Optional<Integer>, Optional<Integer>> {
	public Optional<Integer> call(List<Integer> values, Optional<Integer> current) {
		int newSum = current.orElse(0);
		for(int value : values) {
			newSum += value;
		}
		return Optional.of(newSum);
	}
};


번호 제목 글쓴이 날짜 조회 수
381 hive 0.13.1 설치 + meta정보는 postgresql 9.3에 저장 총관리자 2015.04.30 227
380 [postgresql 9.x] PostgreSQL Replication 구축하기 총관리자 2018.07.17 226
379 Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스 총관리자 2017.04.26 226
378 Cloudera설치중에 "Error, CM server guid updated"오류 발생시 조치방법 총관리자 2018.03.29 225
377 [shell script] 파일을 한줄씩 읽어서 파일내용으로 명령문 만들고 실행하는 shell script예제 총관리자 2017.02.21 224
376 lagom의 online-auction-java프로젝트 실행시 "Could not find Cassandra contact points, due to: ServiceLocator is not bound" 경고 발생시 조치사항 총관리자 2017.10.12 219
375 Hue impala에서 query결과를 HDFS 파일로 export시 AuthorizationException: User 'gooper1234' does not have privileges to access: db명.query_impala_123456 총관리자 2022.03.17 217
374 impala테이블 쿼리시 max_row_size 관련 오류가 발생할때 조치사항 총관리자 2020.02.12 217
373 [sentry]role부여후 테이블명이 변경되어 오류가 발생할때 조치방법 총관리자 2018.10.16 215
372 Error: java.lang.RuntimeException: java.lang.OutOfMemoryError 오류가 발생하는 경우 총관리자 2018.09.20 215
371 프로그래밍 언어별 딥러닝 라이브러리 정리 file 총관리자 2016.10.05 215
370 AIX 7.1에 Hadoop설치(정리중#2) 총관리자 2016.09.20 215
369 우분투 서버에 GUI로 접속하기 file 총관리자 2018.05.27 212
368 /etc/logrotate.d 을 이용한 catalina.out 나누기 file 총관리자 2017.01.19 212
367 lagom의 online-auction-java프로젝트 실행시 외부의 kafka/cassandra를 사용하도록 설정하는 방법 총관리자 2017.10.12 211
366 [개발] 온라인 IDE - 개발 환경 구축 없어 어디서나 웹브라우저로 개발하기 총관리자 2022.05.02 207
365 숭실대 교수님등 강의영상(바이오데이터마이닝, 빅데이터분산컴퓨팅, 컴퓨터 그래픽스, 데이터베이스응용및 프로그램밍, 데이터베이스, 의생명영상처리, 웹그로그래밍, 데이터마이닝, 컴퓨터구조) file 총관리자 2017.06.13 207
364 bin/start-hbase.sh실행시 org.apache.hadoop.hbase.util.FileSystemVersionException: HBase file layout needs to be upgraded오류가 발생하면 조치사항 총관리자 2016.08.01 206
363 [Jsoup]특정페이지를 jsoup을 이용하여 파싱하는 샘플소스 총관리자 2017.04.18 205
362 Hadoop 완벽 가이드 정리된 링크 총관리자 2016.04.19 205

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로