메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


0. topic은 동일한 데이타 구조를 가지는 개별로 topic을 가지고 있어야함

1. avro schema파일 준비(emp.avsc)

{

   "namespace": "w3ii.com",

   "type": "record",

   "name": "emp",

   "fields": [

      {"name": "name", "type": "string"},

      {"name": "id", "type": "int"},

      {"name": "salary", "type": "int"},

      {"name": "age", "type": "int"},

      {"name": "address", "type": "string"}

   ]

}


* 아래의 클래스 구조를 avro schema로 정의하는 샘플

class Child {

    String name;

}


class Parent {

    list<Child> children;

}


아래와 같이 정의해야 한다.------->

{

"name": "Parent",

"type":"record",

"fields":[

    {

        "name":"children",

        "type":{

            "type": "array",  

            "items":{

                        "name":"Child",

                        "type":"record",

                        "fields":[

                            {"name":"name", "type":"string"}

                        ]

                    }

            }

    }

}



2. avsc파일 컴파일

C:tmpavro-tools>java -jar avro-tools-1.7.7.jar compile schema emp.avsc .

Input files to compile:

  emp.avsc


* 컴파일 결과 생성파일 : C:tmpavro-toolsw3iicomemp.java


3. serializing/ deserializing

  가. serializing

public void send(Emp event) {

EncoderFactory avroEncoderFactory = EncoderFactory.get();

SpecificDatumWriter<Emp> avroEventWriter = new SpecificDatumWriter<Emp>(Emp.SCHEMA$);

ByteArrayOutputStream stream = new ByteArrayOutputStream();

BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null);

try {

avroEventWriter.write(event, binaryEncoder);

binaryEncoder.flush();

} catch (IOException e) {

e.printStackTrace();

}

IOUtils.closeQuietly(stream);


KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(

TOPIC, stream.toByteArray());


producer.send(data);

}


  나. deserializing

ConsumerConnector consumer = 

Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCount = new HashMap<String, Integer>();

topicCount.put(TOPIC, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(TOPIC);

SpecificDatumReader<Emp> specificDatumReader = new SpecificDatumReader<Emp>(Emp.class);



4. producer 샘플소스

package com.gooper.icbms.sda.test.kafka;


import java.io.IOException;

import java.util.Properties;


import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;


import org.apache.avro.io.BinaryEncoder;

import org.apache.avro.io.EncoderFactory;

import org.apache.avro.specific.SpecificDatumWriter;

import org.apache.commons.io.IOUtils;

import org.apache.commons.io.output.ByteArrayOutputStream;


import com.gooper.icbms.sda.comm.util.Utils;

import com.gooper.icbms.sda.test.kafka.avro.Emp;


public class AvroEmpEmitter {

public Producer<String, byte[]> producer;

private static final String TOPIC = Utils.KafkaTopics.COL_EMP.toString();

/**

* broker주소

*/

private static String BROKER = "gsda1:9092,gsda2:9092,gsda3:9092";


public AvroEmpEmitter(String broker) {

Properties props = new Properties();

props.put("metadata.broker.list", broker);

props.put("serializer.class", "kafka.serializer.DefaultEncoder");

props.put("partitioner.class", "kafka.producer.DefaultPartitioner");

props.put("request.required.acks", "1");

producer = new Producer<String, byte[]>(new ProducerConfig(props));

}

public void send(Emp event) {

EncoderFactory avroEncoderFactory = EncoderFactory.get();

SpecificDatumWriter<Emp> avroEventWriter = new SpecificDatumWriter<Emp>(Emp.SCHEMA$);

ByteArrayOutputStream stream = new ByteArrayOutputStream();

BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null);

try {

avroEventWriter.write(event, binaryEncoder);

binaryEncoder.flush();

} catch (IOException e) {

e.printStackTrace();

}

IOUtils.closeQuietly(stream);


KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(

TOPIC, stream.toByteArray());


producer.send(data);

}

public static void main(String[] args) {

AvroEmpEmitter avroOneM2MEmitter = new AvroEmpEmitter(BROKER);

// Emp전송

System.out.println("Send start(Emp)......................");

avroOneM2MEmitter.send(buildEmp());

System.out.println("Send end(Emp)......................");


// 전송끝 

avroOneM2MEmitter.close();


}

private void close() {

producer.close();

}


private static Emp buildEmp() {

Emp emp = new Emp();

emp.setId(1000);

emp.setName("이벤트명칭");

emp.setSalary(20);

emp.setAge(1);

emp.setAddress("여기는 주소입니다.");

return emp;

}

}


5. consumer 샘플소스

package com.gooper.icbms.sda.test.kafka;


import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;


import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;


import org.apache.avro.io.BinaryDecoder;

import org.apache.avro.io.DecoderFactory;

import org.apache.avro.specific.SpecificDatumReader;


import com.gooper.icbms.sda.comm.util.Utils;

import com.gooper.icbms.sda.test.kafka.avro.Emp;


public class AvroEmpSubscribe {

private static final String TOPIC = Utils.KafkaTopics.COL_EMP.toString();

private static final String ZOOKEEPER_CONNECTION = "gsda1:2181,gsda2:2181,gsda3:2181";

@SuppressWarnings({ "rawtypes", "unchecked" })

public static void main(String[] args) {

Properties properties = new Properties();

properties.put("zookeeper.connect",ZOOKEEPER_CONNECTION);

properties.put("group.id","testgroup_11");

properties.put("zookeeper.session.timeout.ms", "500");

properties.put("zookeeper.sync.time.ms", "250");

   //properties.put("auto.commit.enable", "false"); 

properties.put("auto.commit.enable", "true"); 

                // 아래 값을 너무 짧게 설정하면 처리할 데이타가 많이 몰리는 경우 처리되지 않는 경우가 발생할 수 있음

// default값이 60000임

properties.put("auto.commit.interval.ms", "60000"); 

properties.put("fetch.message.max.bytes", "31457280"); // 30MB  

properties.put("auto.offset.reset", "smallest");

//properties.put("auto.offset.reset", "largest"); // 최근것부터 처리

ConsumerConnector consumer = 

Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCount = new HashMap<String, Integer>();


topicCount.put(TOPIC, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(TOPIC);

SpecificDatumReader<Emp> specificDatumReader = new SpecificDatumReader<Emp>(Emp.class);

for (final KafkaStream stream : streams) {

ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();

while (consumerIte.hasNext()) {

try {

MessageAndMetadata msg = consumerIte.next();

byte[] message = (byte[]) msg.message();

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null);

Emp read = specificDatumReader.read(null, binaryDecoder);

System.out.println("Message from Topic("+TOPIC+") : " + read.toString());

System.out.println("Message(name) from Topic("+TOPIC+") : " + read.getName());

} catch (Exception e) {

e.printStackTrace();

}

}

}

if (consumer != null) consumer.shutdown();

}

}



* 참고 : https://avro.apache.org/docs/1.7.7/gettingstartedjava.html#Defining+a+schema

번호 제목 글쓴이 날짜 조회 수
641 로그 파일에 대해 Elasticsearch 사용하기 총관리자 2014.09.25 1121
640 RHadoop을 통해서 웹사이트의 방문자수를 예측하는 방법 총관리자 2014.09.26 2007
639 solr에서 한글사용시 주의점 총관리자 2014.09.26 599
638 solr설치및 적용관련 file 총관리자 2014.09.27 2124
637 solr vs elasticsearch 비교2 총관리자 2014.09.29 1276
636 [번역] solr 검색 엔진 튜토리얼 총관리자 2014.10.07 441
635 Using The ZooKeeper CLI에서 zkCli의 위치 총관리자 2014.11.02 881
634 kafka의 re-balance를 이용하여 consumer를 multi thread로 돌려서 topic의 partitions을 활용 총관리자 2015.03.31 248
633 SASL configuration failed: javax.security.auth.login.LoginException: java.lang.NullPointerException 오류 해결방법 총관리자 2015.04.02 701
632 scan의 startrow, stoprow지정하는 방법 총관리자 2015.04.08 375
631 bananapi 5대(ubuntu계열 리눅스)에 yarn(hadoop 2.6.0)설치하기-ResourceManager HA/HDFS HA포함, JobHistory포함 총관리자 2015.04.24 19145
630 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable원인 총관리자 2015.04.27 322
629 zookeeper 3.4.6 설치(3대) 총관리자 2015.04.28 1210
628 Hadoop - 클러스터 세팅및 기동 총관리자 2015.04.28 427
627 HBase 0.98.12(1.2.5) for hadoop2 설치-5대에 완전분산모드 (HDFS HA상테) 총관리자 2015.04.29 1047
626 hive 0.13.1 설치 + meta정보는 postgresql 9.3에 저장 총관리자 2015.04.30 227
625 oozie 4.1 설치 - maven을 이용한 source compile on hadoop 2.5.2 with postgresql 9.3 총관리자 2015.04.30 862
624 hadoop 2.6.0 기동(에코시스템 포함)및 wordcount 어플리케이션을 이용한 테스트 총관리자 2015.05.05 3770
623 java.lang.IllegalArgumentException: Does not contain a valid host:port authority: master 오류해결방법 총관리자 2015.05.06 451
622 znode /hbase recursive하게 지우기 총관리자 2015.05.06 673

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로