메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


0. topic은 동일한 데이타 구조를 가지는 개별로 topic을 가지고 있어야함

1. avro schema파일 준비(emp.avsc)

{

   "namespace": "w3ii.com",

   "type": "record",

   "name": "emp",

   "fields": [

      {"name": "name", "type": "string"},

      {"name": "id", "type": "int"},

      {"name": "salary", "type": "int"},

      {"name": "age", "type": "int"},

      {"name": "address", "type": "string"}

   ]

}


* 아래의 클래스 구조를 avro schema로 정의하는 샘플

class Child {

    String name;

}


class Parent {

    list<Child> children;

}


아래와 같이 정의해야 한다.------->

{

"name": "Parent",

"type":"record",

"fields":[

    {

        "name":"children",

        "type":{

            "type": "array",  

            "items":{

                        "name":"Child",

                        "type":"record",

                        "fields":[

                            {"name":"name", "type":"string"}

                        ]

                    }

            }

    }

}



2. avsc파일 컴파일

C:tmpavro-tools>java -jar avro-tools-1.7.7.jar compile schema emp.avsc .

Input files to compile:

  emp.avsc


* 컴파일 결과 생성파일 : C:tmpavro-toolsw3iicomemp.java


3. serializing/ deserializing

  가. serializing

public void send(Emp event) {

EncoderFactory avroEncoderFactory = EncoderFactory.get();

SpecificDatumWriter<Emp> avroEventWriter = new SpecificDatumWriter<Emp>(Emp.SCHEMA$);

ByteArrayOutputStream stream = new ByteArrayOutputStream();

BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null);

try {

avroEventWriter.write(event, binaryEncoder);

binaryEncoder.flush();

} catch (IOException e) {

e.printStackTrace();

}

IOUtils.closeQuietly(stream);


KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(

TOPIC, stream.toByteArray());


producer.send(data);

}


  나. deserializing

ConsumerConnector consumer = 

Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCount = new HashMap<String, Integer>();

topicCount.put(TOPIC, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(TOPIC);

SpecificDatumReader<Emp> specificDatumReader = new SpecificDatumReader<Emp>(Emp.class);



4. producer 샘플소스

package com.gooper.icbms.sda.test.kafka;


import java.io.IOException;

import java.util.Properties;


import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;


import org.apache.avro.io.BinaryEncoder;

import org.apache.avro.io.EncoderFactory;

import org.apache.avro.specific.SpecificDatumWriter;

import org.apache.commons.io.IOUtils;

import org.apache.commons.io.output.ByteArrayOutputStream;


import com.gooper.icbms.sda.comm.util.Utils;

import com.gooper.icbms.sda.test.kafka.avro.Emp;


public class AvroEmpEmitter {

public Producer<String, byte[]> producer;

private static final String TOPIC = Utils.KafkaTopics.COL_EMP.toString();

/**

* broker주소

*/

private static String BROKER = "gsda1:9092,gsda2:9092,gsda3:9092";


public AvroEmpEmitter(String broker) {

Properties props = new Properties();

props.put("metadata.broker.list", broker);

props.put("serializer.class", "kafka.serializer.DefaultEncoder");

props.put("partitioner.class", "kafka.producer.DefaultPartitioner");

props.put("request.required.acks", "1");

producer = new Producer<String, byte[]>(new ProducerConfig(props));

}

public void send(Emp event) {

EncoderFactory avroEncoderFactory = EncoderFactory.get();

SpecificDatumWriter<Emp> avroEventWriter = new SpecificDatumWriter<Emp>(Emp.SCHEMA$);

ByteArrayOutputStream stream = new ByteArrayOutputStream();

BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null);

try {

avroEventWriter.write(event, binaryEncoder);

binaryEncoder.flush();

} catch (IOException e) {

e.printStackTrace();

}

IOUtils.closeQuietly(stream);


KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(

TOPIC, stream.toByteArray());


producer.send(data);

}

public static void main(String[] args) {

AvroEmpEmitter avroOneM2MEmitter = new AvroEmpEmitter(BROKER);

// Emp전송

System.out.println("Send start(Emp)......................");

avroOneM2MEmitter.send(buildEmp());

System.out.println("Send end(Emp)......................");


// 전송끝 

avroOneM2MEmitter.close();


}

private void close() {

producer.close();

}


private static Emp buildEmp() {

Emp emp = new Emp();

emp.setId(1000);

emp.setName("이벤트명칭");

emp.setSalary(20);

emp.setAge(1);

emp.setAddress("여기는 주소입니다.");

return emp;

}

}


5. consumer 샘플소스

package com.gooper.icbms.sda.test.kafka;


import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;


import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;


import org.apache.avro.io.BinaryDecoder;

import org.apache.avro.io.DecoderFactory;

import org.apache.avro.specific.SpecificDatumReader;


import com.gooper.icbms.sda.comm.util.Utils;

import com.gooper.icbms.sda.test.kafka.avro.Emp;


public class AvroEmpSubscribe {

private static final String TOPIC = Utils.KafkaTopics.COL_EMP.toString();

private static final String ZOOKEEPER_CONNECTION = "gsda1:2181,gsda2:2181,gsda3:2181";

@SuppressWarnings({ "rawtypes", "unchecked" })

public static void main(String[] args) {

Properties properties = new Properties();

properties.put("zookeeper.connect",ZOOKEEPER_CONNECTION);

properties.put("group.id","testgroup_11");

properties.put("zookeeper.session.timeout.ms", "500");

properties.put("zookeeper.sync.time.ms", "250");

   //properties.put("auto.commit.enable", "false"); 

properties.put("auto.commit.enable", "true"); 

                // 아래 값을 너무 짧게 설정하면 처리할 데이타가 많이 몰리는 경우 처리되지 않는 경우가 발생할 수 있음

// default값이 60000임

properties.put("auto.commit.interval.ms", "60000"); 

properties.put("fetch.message.max.bytes", "31457280"); // 30MB  

properties.put("auto.offset.reset", "smallest");

//properties.put("auto.offset.reset", "largest"); // 최근것부터 처리

ConsumerConnector consumer = 

Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCount = new HashMap<String, Integer>();


topicCount.put(TOPIC, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(TOPIC);

SpecificDatumReader<Emp> specificDatumReader = new SpecificDatumReader<Emp>(Emp.class);

for (final KafkaStream stream : streams) {

ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();

while (consumerIte.hasNext()) {

try {

MessageAndMetadata msg = consumerIte.next();

byte[] message = (byte[]) msg.message();

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null);

Emp read = specificDatumReader.read(null, binaryDecoder);

System.out.println("Message from Topic("+TOPIC+") : " + read.toString());

System.out.println("Message(name) from Topic("+TOPIC+") : " + read.getName());

} catch (Exception e) {

e.printStackTrace();

}

}

}

if (consumer != null) consumer.shutdown();

}

}



* 참고 : https://avro.apache.org/docs/1.7.7/gettingstartedjava.html#Defining+a+schema

번호 제목 글쓴이 날짜 조회 수
241 Cannot create /var/run/oozie/oozie.pid: Directory nonexistent오류 총관리자 2014.06.03 479
240 DataSetCreator실행시 "Illegal character in fragment at index"오류가 나는 경우 조치방안 총관리자 2016.06.17 480
239 시스템날짜를 현재 정보로 동기화 하는 방법(rdate, ntpdate이용) 총관리자 2014.08.24 481
238 [dovecot]dovecot restart할때 root@gsda4:/usr/lib/dovecot# service dovecot restart 오류 발생시 조치사항 총관리자 2017.06.12 492
237 Incompatible clusterIDs오류 원인및 해결방법 총관리자 2016.04.01 493
236 원격지에서 zio공유기를 통해서 노트북의 mysql접속을 허용하는 방법 총관리자 2014.09.07 495
235 hive기동시 Caused by: java.net.URISyntaxException: Relative path in absolute URI: ${system:java.io.tmpdir%7D/$%7Bsystem:user.name%7D 오류 발생시 조치사항 총관리자 2016.09.25 496
234 compile할때와 exclude할때 대상을 표현하는 명칭이 다르므로 주의할것 총관리자 2016.08.10 503
233 CDH에서 Sentry 개념및 설정 file 총관리자 2018.06.21 504
232 데이타 제공 사이트 링크 총관리자 2014.08.03 508
231 동시에 많은 요청이 endpoint로 몰려서java.net.NoRouteToHostException가 발생하는 경우의 처리방법 총관리자 2016.10.17 510
230 anaconda3 (v5.2) 설치및 머신러닝 관련 라이브러리 설치 절차 총관리자 2018.07.27 513
229 spark-submit 실행시 "java.lang.OutOfMemoryError: Java heap space"발생시 조치사항 총관리자 2018.02.01 517
228 Ubuntu 16.04 LTS에 4대에 Hadoop 2.8.0설치 총관리자 2017.05.01 521
227 Kafka Offset Monitor로 kafka 상태 모니터링 하기 file 총관리자 2016.11.08 529
226 hadoop의 data디렉토리를 변경하는 방법 총관리자 2014.08.24 536
225 [Kudu]ERROR: Unable to advance iterator for node with id '2' for Kudu table 'impala::core.pm0_abdasubjct': Network error: recv error from unknown peer: Transport endpoint is not connected (error 107) gooper 2023.03.16 536
224 spark client프로그램 기동시 "Error initializing SparkContext"오류 발생할때 조치사항 총관리자 2016.05.27 539
223 spark-shell을 실행하면 "Attempted to request executors before the AM has registered!"라는 오류가 발생하면 총관리자 2018.06.08 545
222 외부 기기(usb, 하드)등 mount(연결)하기 총관리자 2014.08.03 546

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로