메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


* Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스이다. Runnable객채를 만들고 ExecutorService를 이용하여 topic에 지정된 partition개수 만큼의 쓰레드를 생성하여 쓰레드로 작업하도록 되어있다.


import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AvroDataSubscribe implements Serializable  {
	private static final long serialVersionUID = -2895832218133628236L;
	private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString();
	private static final Log log = LogFactory.getLog(AvroDataSubscribe.class);
	
	private final int NUM_THREADS = 3;		
	private final String user_id =this.getClass().getName();
	private final String group_id = this.getClass().getSimpleName();
	
	public static void main(String[] args) {
		AvroDataSubscribe avroDataSubscribe = new AvroDataSubscribe();
		try {
			avroDataSubscribe.collect();
		} catch (Exception ex) {
			log.debug("exception in main() :"+ex.getStackTrace());
		}
	}

	public void collect() throws Exception{
		Properties properties = new Properties();
		
		//class name을 user_id, grup_id로 사용함
		properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
		properties.put("group.id",group_id);
		properties.put("zookeeper.session.timeout.ms", "6000");
		properties.put("zookeeper.sync.time.ms", "2000");
		properties.put("auto.commit.enable", "true");
		properties.put("auto.commit.interval.ms", "5000");
		properties.put("fetch.message.max.bytes", "31457280");		// 30MB		
		properties.put("auto.offset.reset", "smallest");
		
		final ConsumerConnector consumer = 
				Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
		
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(TOPIC, NUM_THREADS);
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 	consumer.createMessageStreams(topicCountMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
		ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
		
		for (int m = 0; m < NUM_THREADS; m++) {
			executor.execute(new ConsumerT(streams.get(m)));
		}
		
	}
	
	public class ConsumerT implements Runnable {
		private KafkaStream<byte[], byte[]> stream;
		private final SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class);
		
		public ConsumerT(KafkaStream<byte[], byte[]> stream) {
			super();
			this.stream = stream;
		}
		
		@Override
		public void run(){
			for(MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
				
				StringBuffer sb = new StringBuffer();
					
				byte[] message = (byte[]) messageAndMetadata.message();
				
				BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null);
				COL_ONEM2M read = null;
				String task_group_id = "";
				String task_id =  "";
				String start_time =  "";
				String colFrom =  "";
				String calcuate_latest_yn =  "";
				 
				try {
					 read = specificDatumReader.read(null, binaryDecoder);
					 
					 List<java.lang.CharSequence> data= read.getData();
					 
					 task_group_id = read.getTaskGroupId().toString();
					 task_id = read.getTaskId().toString();
						
				// 처리에 필요한 로직
				// .....
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
		
	}
}

번호 제목 날짜 조회 수
410 "bad handshake: Error([('SSL routines', 'ssl3_get_server_certificate', 'certificate verify failed')])" 오류는 CA인증을 하지 못해서 발생함 2022.05.13 3995
409 Hive JDBC Connection과 유형별 에러및 필요한 jar파일 2021.05.24 4003
408 Hadoop - 클러스터 세팅및 기동 2015.04.28 4004
407 Scala버젼 변경 혹은 상황에 맞게 Spark소스 컴파일하기 2016.05.31 4008
406 kafka의 re-balance를 이용하여 consumer를 multi thread로 돌려서 topic의 partitions을 활용 2015.03.31 4019
405 Hue Load Balancer를 L4로 L/B하는 경우는 L4쪽 도멘인으로 발행된 인증서를 TLS/SSL항목에 설정해주어야 한다. 2021.10.08 4022
404 collection생성혹은 collection조회시 Plugin init failure for [schema.xml] fieldType "pdate": Error loading class 'solr.IntField' 오류 조치사항 2022.04.07 4022
403 우분투 16.04LTS에 Jupyter설치 2018.04.17 4023
402 Hbase API를 이용하여 scan시 페이징을 고려하여 목록을 가져올때 사용할 수 있는 로직의 예시를 보여줌 2017.04.26 4024
401 Oracle RAC 구성된 DB서버에 대한 컴포넌트별 설정 방법 2022.02.12 4025
400 [Jsoup]특정페이지를 jsoup을 이용하여 파싱하는 샘플소스 2017.04.18 4031
399 spark 시동중 applicationHistory 로그 디렉토리가 없다고 하면서 기동되지 않는 경우 2018.06.01 4038
398 halyard의 console스크립트에서 생성한 repository는 RDF4J Web Applications에서 공유가 되지 않는다. 2017.07.05 4041
397 solr 인스턴스 기동후 shard에 서버가 정상적으로 할당되지 않는 경우 해결책 2016.04.29 4042
396 여러 홈페이지를 운영하거나 혹은 서버에 가입한 사용자들에게 홈페이지 계정을 나누어 줄수 있도록 설정/계정 생성방법 2018.01.23 4042
395 kafka 0.9.0.1 for scala 2.1.1 설치및 테스트 2016.05.02 4052
394 Impala daemon기동시 "Could not create temporary timezone file"오류 발생시 조치사항 2018.03.29 4060
393 scala-eclipse 다운로드 2019.06.09 4060
392 schema설정없이 hive를 최초에 실행했을때 발생하는 오류메세지및 처리방법 2016.09.25 4061
391 drools에서 drl관련 로그를 기록하기 위한 클래스 파일 2016.07.21 4069
위로