메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


*Spark에서 KafkaUtils를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 Java 소스이다.

topic의 partition이 3개로 만들어져 있는데 별도의 thread를 만들어서 처리하지 않고 KafkaUtils.createStream()을 사용시 스레드 개수를 지정하여 주면 지정한 개수 만큼의 스레드를 내부적으로 생성하여 broker의 topic에 접근한다.


*샘플프로그램

import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

public class AvroDataSparkSubscribe implements Serializable {
	private static final long serialVersionUID = 1333478786266564011L;
	private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString();
	private static final Log log = LogFactory.getLog(AvroDataSparkSubscribe.class);
	
	private final TripleService tripleService = new TripleService();	
	private final int NUM_THREADS = 3;
		
	private final String user_id =this.getClass().getName();
	private final String group_id = this.getClass().getSimpleName();
	
	public static void main(String[] args) {
		AvroDataSparkSubscribe avroDataSparkSubscribe = new AvroDataSparkSubscribe();
		try {
			avroDataSparkSubscribe.collect();
		} catch (Exception ex) {
			log.debug("exception in main() :"+ex.getStackTrace());
		}
	}

	public void collect() throws Exception{
		SparkConf sc=new SparkConf().setAppName("AvroDataSparkSubscribe")
				 .set("spark.ui.port", "4042")
				 .set("spark.blockManager.port", "38020")
				 .set("spark.broadcast.port", "38021")
				 .set("spark.driver.port", "38022")
				 .set("spark.executor.port", "38023")
				 .set("spark.fileserver.port", "38024")
				 .set("spark.replClassServer.port", "38025")
				 .set("spark.driver.memory", "4g")
				 .set("spark.executor.memory", "4g")
				 ;
		JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));

		Map<String, String> conf = new HashMap<String, String>();
				//class name을 user_id, grup_id로 사용함
				conf.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
				conf.put("group.id",group_id);
				conf.put("zookeeper.session.timeout.ms", "6000");
				conf.put("zookeeper.sync.time.ms", "2000");
				conf.put("auto.commit.enable", "true");
				conf.put("auto.commit.interval.ms", "5000");
				conf.put("fetch.message.max.bytes", "31457280");		// 30MB		
				conf.put("auto.offset.reset", "smallest");
		
		jssc.checkpoint("/tmp");
		Map<String, Integer> topic = new HashMap<String, Integer>();
		topic.put(TOPIC, NUM_THREADS);

		try {
			JavaPairReceiverInputDStream<byte[], byte[]> kafkaStream = KafkaUtils.createStream(jssc,byte[].class, byte[].class, kafka.serializer.DefaultDecoder.class, kafka.serializer.DefaultDecoder.class, conf, topic, StorageLevel.MEMORY_ONLY());
		    JavaDStream<byte[]> lines = kafkaStream.map(tuple2 -> tuple2._2());
		    
			Function <byte[], String> wrkF =
					  new Function<byte[], String> (){
						private static final long serialVersionUID = 4509609657912968079L;

						public String call(byte[] x) {
							BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(x, null);
							SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class);
							try {
								COL_ONEM2M read = specificDatumReader.read(null, binaryDecoder);								
								new ConsumerT(read).go();
							} catch (Exception e) {
								log.debug("xxx=>"+e.getMessage());
							}
							return "";							
						}
					  };
					  
			JavaDStream<String> rst = lines.map(wrkF);
			
			// action을 위해서...
			rst.print();
			
			jssc.start();
			jssc.awaitTermination();
		} catch (Exception e) {
			e.printStackTrace();
		  log.debug("exception : "+e.getMessage());
		}
	}
	
	public class ConsumerT implements Serializable {
		private static final long serialVersionUID = 7697840079748720000L;
		private COL_ONEM2M read;
		
		public ConsumerT(COL_ONEM2M read) {
			super();
			this.read = read;
		}
		
		public void go(){
			StringBuffer sb = new StringBuffer();
				
			String task_group_id = "";
			String task_id =  "";
			String start_time =  "";
			
			try {
				 List<java.lang.CharSequence> data= read.getData();
				 
				 task_group_id = read.getTaskGroupId().toString();
				 task_id = read.getTaskId().toString();
				 start_time = read.getStartTime().toString();

				 // 필요한 로직 ..... 
				 
			} catch (Exception e) {
				e.printStackTrace();
			} // try
		} // go method
	} // ConsumerT class
}

번호 제목 날짜 조회 수
85 dual table만들기 2014.05.16 4246
84 spark 온라인 책자링크 (제목 : mastering-apache-spark) 2016.05.25 4248
83 [impala]쿼리 수행중 발생하는 오류(due to memory pressure: the memory usage of this transaction, Failed to write to server) 2022.10.05 4259
82 CentOS 7.x에 Jupyter설치 2018.04.18 4272
81 external partition table생성및 data확인 2014.04.03 4280
80 [TLS/SSL]Kudu Master 설정하기 2022.05.13 4294
79 Scala를 이용한 Streaming예제 2018.03.08 4302
78 AIX 7.1에서 hive실행시 "hive: line 86: readlink: command not found" 오류가 발생시 임시 조치사항 2016.09.25 4303
77 AnalysisException: Incomplatible return type 'DECIMAL(38,0)' and 'DECIMAL(38,5)' of exprs가 발생시 조치 2021.07.26 4317
76 Soft memory limit exceeded (at 100.05% of capacity) 오류 조치 2022.01.17 4323
75 conda를 이용한 jupyterhub(v0.9)및 jupyter설치 (v4.4.0) 2018.07.30 4325
74 Apache Spark와 Drools를 이용한 CEP구현 테스트 2016.07.15 4361
73 scala application 샘플소스(SparkSession이용) 2018.03.07 4366
72 spark-env.sh에서 사용할 수있는 항목. 2016.05.24 4378
71 kudu hms check 사용법(예시) 2021.10.22 4401
70 [Kudu]Schema별 혹은 테이블별 사용량(Replica포함) 구하는 방법 2022.07.14 4404
69 Windows7 64bit 환경에서 Apache Spark 2.2.0 설치하기 2017.07.26 4409
68 spark-submit으로 spark application실행하는 다양한 방법 2016.05.25 4410
67 [CDP7.1.7]Impala Query의 Memory Spilled 양은 ScratchFileUsedBytes값을 누적해서 구할 수 있다. 2022.07.29 4423
66 hive query에서 mapreduce돌리지 않고 select하는 방법 2014.05.23 4430
위로