메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


* Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스이다. Runnable객채를 만들고 ExecutorService를 이용하여 topic에 지정된 partition개수 만큼의 쓰레드를 생성하여 쓰레드로 작업하도록 되어있다.


import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AvroDataSubscribe implements Serializable  {
	private static final long serialVersionUID = -2895832218133628236L;
	private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString();
	private static final Log log = LogFactory.getLog(AvroDataSubscribe.class);
	
	private final int NUM_THREADS = 3;		
	private final String user_id =this.getClass().getName();
	private final String group_id = this.getClass().getSimpleName();
	
	public static void main(String[] args) {
		AvroDataSubscribe avroDataSubscribe = new AvroDataSubscribe();
		try {
			avroDataSubscribe.collect();
		} catch (Exception ex) {
			log.debug("exception in main() :"+ex.getStackTrace());
		}
	}

	public void collect() throws Exception{
		Properties properties = new Properties();
		
		//class name을 user_id, grup_id로 사용함
		properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
		properties.put("group.id",group_id);
		properties.put("zookeeper.session.timeout.ms", "6000");
		properties.put("zookeeper.sync.time.ms", "2000");
		properties.put("auto.commit.enable", "true");
		properties.put("auto.commit.interval.ms", "5000");
		properties.put("fetch.message.max.bytes", "31457280");		// 30MB		
		properties.put("auto.offset.reset", "smallest");
		
		final ConsumerConnector consumer = 
				Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
		
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(TOPIC, NUM_THREADS);
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 	consumer.createMessageStreams(topicCountMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
		ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
		
		for (int m = 0; m < NUM_THREADS; m++) {
			executor.execute(new ConsumerT(streams.get(m)));
		}
		
	}
	
	public class ConsumerT implements Runnable {
		private KafkaStream<byte[], byte[]> stream;
		private final SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class);
		
		public ConsumerT(KafkaStream<byte[], byte[]> stream) {
			super();
			this.stream = stream;
		}
		
		@Override
		public void run(){
			for(MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
				
				StringBuffer sb = new StringBuffer();
					
				byte[] message = (byte[]) messageAndMetadata.message();
				
				BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null);
				COL_ONEM2M read = null;
				String task_group_id = "";
				String task_id =  "";
				String start_time =  "";
				String colFrom =  "";
				String calcuate_latest_yn =  "";
				 
				try {
					 read = specificDatumReader.read(null, binaryDecoder);
					 
					 List<java.lang.CharSequence> data= read.getData();
					 
					 task_group_id = read.getTaskGroupId().toString();
					 task_id = read.getTaskId().toString();
						
				// 처리에 필요한 로직
				// .....
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
		
	}
}

번호 제목 날짜 조회 수
450 Caused by: java.sql.SQLNonTransientConnectionException: Could not read resultset: unexpected end of stream, read 0 bytes from 4 오류시 확인/조치할 내용 2016.10.31 6936
449 mybatis와 spring을 org.apache.commons.dbcp2.BasicDataSource의 DataSource로 연동할때 DB설정(참고) 2016.10.31 4324
448 How-to: Tune Your Apache Spark Jobs (Part 2) file 2016.10.31 3457
447 How-to: Build a Complex Event Processing App on Apache Spark and Drools file 2016.10.31 2849
446 Flume을 이용한 데이타 수집시 HBase write 성능 튜닝 file 2016.10.31 3112
445 Flume과 Kafka를 사용한 초당 100만개 로그 수집 테스트 file 2016.10.31 4339
444 Spark Streaming 코드레벨단에서의 성능개선 2016.10.31 3011
443 centos 6에서 mariadb 5.1 to 10.0 으로 upgrade 2016.11.01 2719
442 java스레드 덤프 분석하기 file 2016.11.03 2114
441 데이타 분석및 머신러닝에 도움이 도움이 되는 사이트 2016.11.04 3266
440 [SparkR]SparkR 설치 사용기 1 - Installation Guide On Yarn Cluster & Mesos Cluster & Stand Alone Cluster file 2016.11.04 2862
439 Eclipse실행시 Java was started but returned exit code=1이라는 오류가 발생할때 조치방법 2016.11.07 3369
438 Kafka Offset Monitor로 kafka 상태 모니터링 하기 file 2016.11.08 4373
437 참고할만한 spark예제를 설명하는 사이트 2016.11.11 3411
436 spark notebook 0.7.0설치및 설정 2016.11.14 3574
435 git 초기화(Windows에서 Git Bash사용) 2016.11.17 3607
434 특정 단계의 commit상태로 만들기(이렇게 하면 중간에 반영된 모든 commit를 history가 삭제된다) 2016.11.17 3391
433 Github를 이용하는 전체 흐름 이해하기 2016.11.18 2641
432 특정 커밋 시점(commit id를 기준으로)으로 돌리기(reset) 2016.11.21 3732
431 .gitignore파일에 지정되지 않은 파일이 ignore되는 경우 확인방법 2016.11.22 4422
위로