메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


* Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스이다. Runnable객채를 만들고 ExecutorService를 이용하여 topic에 지정된 partition개수 만큼의 쓰레드를 생성하여 쓰레드로 작업하도록 되어있다.


import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AvroDataSubscribe implements Serializable  {
private static final long serialVersionUID = -2895832218133628236L;
private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString();
private static final Log log = LogFactory.getLog(AvroDataSubscribe.class);

private final int NUM_THREADS = 3;
private final String user_id =this.getClass().getName();
private final String group_id = this.getClass().getSimpleName();

public static void main(String[] args) {
AvroDataSubscribe avroDataSubscribe = new AvroDataSubscribe();
try {
avroDataSubscribe.collect();
} catch (Exception ex) {
log.debug("exception in main() :"+ex.getStackTrace());
}
}

public void collect() throws Exception{
Properties properties = new Properties();

//class name을 user_id, grup_id로 사용함
properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
properties.put("group.id",group_id);
properties.put("zookeeper.session.timeout.ms", "6000");
properties.put("zookeeper.sync.time.ms", "2000");
properties.put("auto.commit.enable", "true");
properties.put("auto.commit.interval.ms", "5000");
properties.put("fetch.message.max.bytes", "31457280"); // 30MB
properties.put("auto.offset.reset", "smallest");

final ConsumerConnector consumer = 
Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, NUM_THREADS);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =  consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);

for (int m = 0; m < NUM_THREADS; m++) {
executor.execute(new ConsumerT(streams.get(m)));
}

}

public class ConsumerT implements Runnable {
private KafkaStream<byte[], byte[]> stream;
private final SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class);

public ConsumerT(KafkaStream<byte[], byte[]> stream) {
super();
this.stream = stream;
}

@Override
public void run(){
for(MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {

StringBuffer sb = new StringBuffer();

byte[] message = (byte[]) messageAndMetadata.message();

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null);
COL_ONEM2M read = null;
String task_group_id = "";
String task_id =  "";
String start_time =  "";
String colFrom =  "";
String calcuate_latest_yn =  "";
 
try {
 read = specificDatumReader.read(null, binaryDecoder);
 
 List<java.lang.CharSequence> data= read.getData();
 
 task_group_id = read.getTaskGroupId().toString();
 task_id = read.getTaskId().toString();

// 처리에 필요한 로직
// .....
} catch (Exception e) {
e.printStackTrace();
}
}
}

}
}

번호 제목 날짜 조회 수
757 hue.axes_accessattempt테이블의 username컬럼에 NULL 혹은 space가 들어갈수도 있음. 2021.11.03 92375
756 oozie의 meta정보를 mysql에서 관리하기 2014.05.26 27762
755 bananapi 5대(ubuntu계열 리눅스)에 yarn(hadoop 2.6.0)설치하기-ResourceManager HA/HDFS HA포함, JobHistory포함 2015.04.24 25708
754 mapreduce appliction을 실행시 "is running beyond virtual memory limits" 오류 발생시 조치사항 2017.05.04 22844
753 org.apache.hadoop.hdfs.server.common.InconsistentFSStateException: Directory /tmp/hadoop-root/dfs/name is in an inconsistent state: storage directory does not exist or is not accessible. 2013.03.11 18508
752 Cloudera의 API를 이용하여 impala의 실행되었던 쿼리 확인하는 예시 2018.05.03 16984
751 System Properties Comparison Elasticsearch vs. Hive vs. Jena file 2016.03.10 16165
750 Cloudera Hadoop and Spark Developer Certification 준비(참고) 2018.05.16 15602
749 Hive Query Examples from test code (2 of 2) 2014.03.26 14815
748 Resource temporarily unavailable(자원이 일시적으로 사용 불가능함) 오류조치 2015.11.19 13244
747 drop table로 삭제했으나 tablet server에는 여전히 존재하는 테이블 삭제방법 2021.07.09 12648
746 [DataNode]org.apache.hadoop.security.KerberosAuthException: failure to login: for principal: hdfs/datanode03@GOOPER.COM from keytab hdfs.keytab오류 2023.04.18 12220
745 insert hbase by hive ... error occured after 5 hours..HMaster가 뜨지 않는 장애에 대한 복구 방법 2014.04.29 11966
744 [Decommission]시 시간이 많이 걸리면서(수일) Decommission이 완료되지 않는 경우 조치 2018.01.03 11814
743 mysql 5.5.34-0ubuntu0.13.04용 설치/진행 화면 2014.09.10 11489
742 hive job실행시 meta정보를 원격의 mysql에 저장하는 경우 설정방법 2014.05.28 11293
741 hive 2.0.1 설치및 mariadb로 metastore 설정 2016.06.03 11129
740 cumulusRDF 1.0.1설치및 "KeyspaceCumulus" keyspace확인하기 file 2016.04.15 11112
739 Windows7 64bit 환경에서 Apache Spark 2.2.0 설치하기 2017.07.26 11060
738 jupyter, zeppelin, rstudio를 이용하여 spark cluster에 job를 실행시키기 위한 정보 2018.04.13 10562
위로