메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


* Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스이다. Runnable객채를 만들고 ExecutorService를 이용하여 topic에 지정된 partition개수 만큼의 쓰레드를 생성하여 쓰레드로 작업하도록 되어있다.


import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AvroDataSubscribe implements Serializable  {
	private static final long serialVersionUID = -2895832218133628236L;
	private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString();
	private static final Log log = LogFactory.getLog(AvroDataSubscribe.class);
	
	private final int NUM_THREADS = 3;		
	private final String user_id =this.getClass().getName();
	private final String group_id = this.getClass().getSimpleName();
	
	public static void main(String[] args) {
		AvroDataSubscribe avroDataSubscribe = new AvroDataSubscribe();
		try {
			avroDataSubscribe.collect();
		} catch (Exception ex) {
			log.debug("exception in main() :"+ex.getStackTrace());
		}
	}

	public void collect() throws Exception{
		Properties properties = new Properties();
		
		//class name을 user_id, grup_id로 사용함
		properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
		properties.put("group.id",group_id);
		properties.put("zookeeper.session.timeout.ms", "6000");
		properties.put("zookeeper.sync.time.ms", "2000");
		properties.put("auto.commit.enable", "true");
		properties.put("auto.commit.interval.ms", "5000");
		properties.put("fetch.message.max.bytes", "31457280");		// 30MB		
		properties.put("auto.offset.reset", "smallest");
		
		final ConsumerConnector consumer = 
				Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
		
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(TOPIC, NUM_THREADS);
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 	consumer.createMessageStreams(topicCountMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
		ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
		
		for (int m = 0; m < NUM_THREADS; m++) {
			executor.execute(new ConsumerT(streams.get(m)));
		}
		
	}
	
	public class ConsumerT implements Runnable {
		private KafkaStream<byte[], byte[]> stream;
		private final SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class);
		
		public ConsumerT(KafkaStream<byte[], byte[]> stream) {
			super();
			this.stream = stream;
		}
		
		@Override
		public void run(){
			for(MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
				
				StringBuffer sb = new StringBuffer();
					
				byte[] message = (byte[]) messageAndMetadata.message();
				
				BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null);
				COL_ONEM2M read = null;
				String task_group_id = "";
				String task_id =  "";
				String start_time =  "";
				String colFrom =  "";
				String calcuate_latest_yn =  "";
				 
				try {
					 read = specificDatumReader.read(null, binaryDecoder);
					 
					 List<java.lang.CharSequence> data= read.getData();
					 
					 task_group_id = read.getTaskGroupId().toString();
					 task_id = read.getTaskId().toString();
						
				// 처리에 필요한 로직
				// .....
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
		
	}
}

번호 제목 글쓴이 날짜 조회 수
260 hive metadata(hive, impala, kudu 정보가 있음) 테이블에서 db, table, owner, location를 조회하는 쿼리 총관리자 2020.02.07 380
259 namenode오류 복구시 사용하는 명령 총관리자 2016.04.01 377
258 scan의 startrow, stoprow지정하는 방법 총관리자 2015.04.08 375
257 Namenode Metadata백업하는 방법 총관리자 2020.02.10 374
256 HUE를 사용할 사용자를 추가 하는 절차 총관리자 2018.05.29 367
255 hadoop클러스터를 구성하던 서버중 HA를 담당하는 서버의 hostname등이 변경되어 문제가 발생했을때 조치사항 총관리자 2016.07.29 363
254 root계정으로 MariaDB설치후 mysql -u root -p로 db에 접근하여 바로 해줘야 하는일..(케릭터셑은 utf8) 총관리자 2015.10.02 361
253 HDFS상의 /tmp폴더에 Permission denied오류가 발생시 조치사항 총관리자 2017.01.25 360
252 기준일자 이전의 hdfs 데이타를 지우는 shellscript 샘플 총관리자 2019.06.14 359
251 TransmitData() to failed: Network error: Recv() got EOF from remote (error 108) 오류 현상 총관리자 2019.02.15 359
250 impala,hive및 hdfs만 접근가능하고 파일을 이용한 테이블생성가능하도록 hue 권한설정설정 총관리자 2018.09.17 357
249 Hive MetaStore Server기동시 Could not create "increment"/"table" value-generation container SEQUENCE_TABLE since autoCreate flags do not allow it. 오류발생시 조치사항 총관리자 2017.05.03 349
248 [kudu]테이블 drop이 안되고 timeout이 걸리는 경우 조치 방법 총관리자 2020.06.08 348
247 Apache Spark와 Drools를 이용한 CEP구현 테스트 총관리자 2016.07.15 342
246 [sqoop] mapper를 2이상으로 설정하기 위한 split-by컬럼을 찾을때 유용하게 활용할 수 있는 쿼리 총관리자 2020.05.13 340
245 sentry설정후 beeline으로 hive2server에 접속하여 admin계정에 admin권한 부여하기 총관리자 2018.07.03 336
244 Cleaning up the staging area file시 'cannot access' 혹은 'Directory is not writable' 발생시 조치사항 총관리자 2017.05.02 336
243 Cloudera가 사용하는 서비스별 포트 총관리자 2018.03.29 326
242 cloudera-scm-agent 설정파일 위치및 재시작 명령문 총관리자 2018.03.29 325
241 Container killed by the ApplicationMaster. Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 TaskAttempt killed because it ran on unusable node 오류시 조치방법 총관리자 2017.04.06 325

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로