메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


* Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스이다. Runnable객채를 만들고 ExecutorService를 이용하여 topic에 지정된 partition개수 만큼의 쓰레드를 생성하여 쓰레드로 작업하도록 되어있다.


import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AvroDataSubscribe implements Serializable  {
	private static final long serialVersionUID = -2895832218133628236L;
	private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString();
	private static final Log log = LogFactory.getLog(AvroDataSubscribe.class);
	
	private final int NUM_THREADS = 3;		
	private final String user_id =this.getClass().getName();
	private final String group_id = this.getClass().getSimpleName();
	
	public static void main(String[] args) {
		AvroDataSubscribe avroDataSubscribe = new AvroDataSubscribe();
		try {
			avroDataSubscribe.collect();
		} catch (Exception ex) {
			log.debug("exception in main() :"+ex.getStackTrace());
		}
	}

	public void collect() throws Exception{
		Properties properties = new Properties();
		
		//class name을 user_id, grup_id로 사용함
		properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
		properties.put("group.id",group_id);
		properties.put("zookeeper.session.timeout.ms", "6000");
		properties.put("zookeeper.sync.time.ms", "2000");
		properties.put("auto.commit.enable", "true");
		properties.put("auto.commit.interval.ms", "5000");
		properties.put("fetch.message.max.bytes", "31457280");		// 30MB		
		properties.put("auto.offset.reset", "smallest");
		
		final ConsumerConnector consumer = 
				Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
		
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(TOPIC, NUM_THREADS);
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 	consumer.createMessageStreams(topicCountMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
		ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
		
		for (int m = 0; m < NUM_THREADS; m++) {
			executor.execute(new ConsumerT(streams.get(m)));
		}
		
	}
	
	public class ConsumerT implements Runnable {
		private KafkaStream<byte[], byte[]> stream;
		private final SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class);
		
		public ConsumerT(KafkaStream<byte[], byte[]> stream) {
			super();
			this.stream = stream;
		}
		
		@Override
		public void run(){
			for(MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
				
				StringBuffer sb = new StringBuffer();
					
				byte[] message = (byte[]) messageAndMetadata.message();
				
				BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null);
				COL_ONEM2M read = null;
				String task_group_id = "";
				String task_id =  "";
				String start_time =  "";
				String colFrom =  "";
				String calcuate_latest_yn =  "";
				 
				try {
					 read = specificDatumReader.read(null, binaryDecoder);
					 
					 List<java.lang.CharSequence> data= read.getData();
					 
					 task_group_id = read.getTaskGroupId().toString();
					 task_id = read.getTaskId().toString();
						
				// 처리에 필요한 로직
				// .....
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
		
	}
}

번호 제목 글쓴이 날짜 조회 수
15 kafka broker기동시 brokerId가 달라서 기동에 실패하는 경우 조치방법 총관리자 2016.05.02 2325
14 avro 사용하기(avsc 스키마 파일 컴파일 방법, consumer, producer샘플소스) 총관리자 2016.07.08 1263
13 Flume과 Kafka를 사용한 초당 100만개 로그 수집 테스트 file 총관리자 2016.10.31 1017
12 kafka-manager 1.3.3.4 설정및 실행하기 총관리자 2017.03.20 617
11 Kafka Offset Monitor로 kafka 상태 모니터링 하기 file 총관리자 2016.11.08 524
10 kafka 0.9.0.1 for scala 2.1.1 설치및 테스트 총관리자 2016.05.02 412
9 kafka 0.9.0.1버젼의 producer와 kafka버젼이 0.10.0.1인 consumer가 서로 대화하는 모습 총관리자 2016.08.18 288
8 kafka에서 메세지 중복 consume이 발생할 수 있는 상황 총관리자 2018.10.23 258
7 kafka의 re-balance를 이용하여 consumer를 multi thread로 돌려서 topic의 partitions을 활용 총관리자 2015.03.31 248
6 down된 broker로 메세지를 전송하려는 경우의 오류 내용및 조치사항 총관리자 2016.08.12 238
» Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스 총관리자 2017.04.26 226
4 운영중인 상태에서 kafka topic삭제하고 재생성하여 처리되지 않은 메세지 모두 삭제하기 총관리자 2016.10.24 156
3 No broker partitions consumed by consumer thread오류 발생시 확인/조치할 사항 총관리자 2016.09.02 151
2 producer / consumer구현시 설정 옵션 설명 총관리자 2016.10.19 121
1 kafkaWordCount.scala의 producer와 consumer 클래스를 이용하여 kafka를 이용한 word count 테스트 하기 총관리자 2016.08.02 97

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로