메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


0. topic은 동일한 데이타 구조를 가지는 개별로 topic을 가지고 있어야함

1. avro schema파일 준비(emp.avsc)

{

   "namespace": "w3ii.com",

   "type": "record",

   "name": "emp",

   "fields": [

      {"name": "name", "type": "string"},

      {"name": "id", "type": "int"},

      {"name": "salary", "type": "int"},

      {"name": "age", "type": "int"},

      {"name": "address", "type": "string"}

   ]

}


* 아래의 클래스 구조를 avro schema로 정의하는 샘플

class Child {

    String name;

}


class Parent {

    list<Child> children;

}


아래와 같이 정의해야 한다.------->

{

"name": "Parent",

"type":"record",

"fields":[

    {

        "name":"children",

        "type":{

            "type": "array",  

            "items":{

                        "name":"Child",

                        "type":"record",

                        "fields":[

                            {"name":"name", "type":"string"}

                        ]

                    }

            }

    }

}



2. avsc파일 컴파일

C:tmpavro-tools>java -jar avro-tools-1.7.7.jar compile schema emp.avsc .

Input files to compile:

  emp.avsc


* 컴파일 결과 생성파일 : C:tmpavro-toolsw3iicomemp.java


3. serializing/ deserializing

  가. serializing

public void send(Emp event) {

EncoderFactory avroEncoderFactory = EncoderFactory.get();

SpecificDatumWriter<Emp> avroEventWriter = new SpecificDatumWriter<Emp>(Emp.SCHEMA$);

ByteArrayOutputStream stream = new ByteArrayOutputStream();

BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null);

try {

avroEventWriter.write(event, binaryEncoder);

binaryEncoder.flush();

} catch (IOException e) {

e.printStackTrace();

}

IOUtils.closeQuietly(stream);


KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(

TOPIC, stream.toByteArray());


producer.send(data);

}


  나. deserializing

ConsumerConnector consumer = 

Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCount = new HashMap<String, Integer>();

topicCount.put(TOPIC, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(TOPIC);

SpecificDatumReader<Emp> specificDatumReader = new SpecificDatumReader<Emp>(Emp.class);



4. producer 샘플소스

package com.gooper.icbms.sda.test.kafka;


import java.io.IOException;

import java.util.Properties;


import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;


import org.apache.avro.io.BinaryEncoder;

import org.apache.avro.io.EncoderFactory;

import org.apache.avro.specific.SpecificDatumWriter;

import org.apache.commons.io.IOUtils;

import org.apache.commons.io.output.ByteArrayOutputStream;


import com.gooper.icbms.sda.comm.util.Utils;

import com.gooper.icbms.sda.test.kafka.avro.Emp;


public class AvroEmpEmitter {

public Producer<String, byte[]> producer;

private static final String TOPIC = Utils.KafkaTopics.COL_EMP.toString();

/**

* broker주소

*/

private static String BROKER = "gsda1:9092,gsda2:9092,gsda3:9092";


public AvroEmpEmitter(String broker) {

Properties props = new Properties();

props.put("metadata.broker.list", broker);

props.put("serializer.class", "kafka.serializer.DefaultEncoder");

props.put("partitioner.class", "kafka.producer.DefaultPartitioner");

props.put("request.required.acks", "1");

producer = new Producer<String, byte[]>(new ProducerConfig(props));

}

public void send(Emp event) {

EncoderFactory avroEncoderFactory = EncoderFactory.get();

SpecificDatumWriter<Emp> avroEventWriter = new SpecificDatumWriter<Emp>(Emp.SCHEMA$);

ByteArrayOutputStream stream = new ByteArrayOutputStream();

BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null);

try {

avroEventWriter.write(event, binaryEncoder);

binaryEncoder.flush();

} catch (IOException e) {

e.printStackTrace();

}

IOUtils.closeQuietly(stream);


KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(

TOPIC, stream.toByteArray());


producer.send(data);

}

public static void main(String[] args) {

AvroEmpEmitter avroOneM2MEmitter = new AvroEmpEmitter(BROKER);

// Emp전송

System.out.println("Send start(Emp)......................");

avroOneM2MEmitter.send(buildEmp());

System.out.println("Send end(Emp)......................");


// 전송끝 

avroOneM2MEmitter.close();


}

private void close() {

producer.close();

}


private static Emp buildEmp() {

Emp emp = new Emp();

emp.setId(1000);

emp.setName("이벤트명칭");

emp.setSalary(20);

emp.setAge(1);

emp.setAddress("여기는 주소입니다.");

return emp;

}

}


5. consumer 샘플소스

package com.gooper.icbms.sda.test.kafka;


import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;


import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;


import org.apache.avro.io.BinaryDecoder;

import org.apache.avro.io.DecoderFactory;

import org.apache.avro.specific.SpecificDatumReader;


import com.gooper.icbms.sda.comm.util.Utils;

import com.gooper.icbms.sda.test.kafka.avro.Emp;


public class AvroEmpSubscribe {

private static final String TOPIC = Utils.KafkaTopics.COL_EMP.toString();

private static final String ZOOKEEPER_CONNECTION = "gsda1:2181,gsda2:2181,gsda3:2181";

@SuppressWarnings({ "rawtypes", "unchecked" })

public static void main(String[] args) {

Properties properties = new Properties();

properties.put("zookeeper.connect",ZOOKEEPER_CONNECTION);

properties.put("group.id","testgroup_11");

properties.put("zookeeper.session.timeout.ms", "500");

properties.put("zookeeper.sync.time.ms", "250");

   //properties.put("auto.commit.enable", "false"); 

properties.put("auto.commit.enable", "true"); 

                // 아래 값을 너무 짧게 설정하면 처리할 데이타가 많이 몰리는 경우 처리되지 않는 경우가 발생할 수 있음

// default값이 60000임

properties.put("auto.commit.interval.ms", "60000"); 

properties.put("fetch.message.max.bytes", "31457280"); // 30MB  

properties.put("auto.offset.reset", "smallest");

//properties.put("auto.offset.reset", "largest"); // 최근것부터 처리

ConsumerConnector consumer = 

Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCount = new HashMap<String, Integer>();


topicCount.put(TOPIC, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(TOPIC);

SpecificDatumReader<Emp> specificDatumReader = new SpecificDatumReader<Emp>(Emp.class);

for (final KafkaStream stream : streams) {

ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();

while (consumerIte.hasNext()) {

try {

MessageAndMetadata msg = consumerIte.next();

byte[] message = (byte[]) msg.message();

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null);

Emp read = specificDatumReader.read(null, binaryDecoder);

System.out.println("Message from Topic("+TOPIC+") : " + read.toString());

System.out.println("Message(name) from Topic("+TOPIC+") : " + read.getName());

} catch (Exception e) {

e.printStackTrace();

}

}

}

if (consumer != null) consumer.shutdown();

}

}



* 참고 : https://avro.apache.org/docs/1.7.7/gettingstartedjava.html#Defining+a+schema

번호 제목 글쓴이 날짜 조회 수
380 index생성, 삭제, 활용 총관리자 2014.04.25 1702
379 갑자기 DataNode가 java.io.IOException: Premature EOF from inputStream를 반복적으로 발생시키다가 java.lang.OutOfMemoryError: Java heap space를 내면서 죽는 경우 조치방법 총관리자 2017.07.19 1685
378 hue db에서 사용자가 가지는 정보 확인 총관리자 2020.02.10 1644
377 Cloudera Manager설치및 Uninstall 방법(순서) 총관리자 2018.05.28 1644
376 impald에서 idle_query_timeout 와 idle_session_timeout 구분 총관리자 2021.05.20 1630
375 sqoop에서 oracle관련 작업할때 테이블명, 사용자명, DB명은 모두 대문자로 사용할것 총관리자 2014.05.15 1528
374 physical memory used되면서 mapper가 kill되는 경우 오류 발생시 조치 총관리자 2018.09.20 1522
373 FAILED: IllegalStateException Variable substitution depth too large: 40 오류발생시 조치사항 총관리자 2014.08.19 1521
372 Journal Storage Directory /data/hadoop/journal/data/mycluster not formatted 오류시 조치사항 총관리자 2016.07.29 1518
371 hiverserver2기동시 connection refused가 발생하는 경우 조치방법 총관리자 2014.05.22 1472
370 oozie의 meta정보를 mysql에서 관리하기 총관리자 2014.05.26 1466
369 flume 1.5.2 설치및 테스트(source : file, sink : hdfs) in HA 총관리자 2015.05.21 1415
368 ExWordCount jar파일 file 구퍼 2013.03.06 1336
367 Oozie 설치, 환경설정 및 테스트 총관리자 2014.04.08 1293
» avro 사용하기(avsc 스키마 파일 컴파일 방법, consumer, producer샘플소스) 총관리자 2016.07.08 1278
365 schema설정없이 hive를 최초에 실행했을때 발생하는 오류메세지및 처리방법 총관리자 2016.09.25 1226
364 json 값 다루기 총관리자 2014.04.17 1222
363 upsert구현방법(년-월-일 파티션을 기준으로) 및 테스트 script file 총관리자 2018.07.03 1221
362 zookeeper 3.4.6 설치(3대) 총관리자 2015.04.28 1210
361 resouce manager에 dr.who가 아닌 다른 사용자로 로그인 하기 총관리자 2018.06.28 1207

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로