메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


0. topic은 동일한 데이타 구조를 가지는 개별로 topic을 가지고 있어야함

1. avro schema파일 준비(emp.avsc)

{

   "namespace": "w3ii.com",

   "type": "record",

   "name": "emp",

   "fields": [

      {"name": "name", "type": "string"},

      {"name": "id", "type": "int"},

      {"name": "salary", "type": "int"},

      {"name": "age", "type": "int"},

      {"name": "address", "type": "string"}

   ]

}


* 아래의 클래스 구조를 avro schema로 정의하는 샘플

class Child {

    String name;

}


class Parent {

    list<Child> children;

}


아래와 같이 정의해야 한다.------->

{

"name": "Parent",

"type":"record",

"fields":[

    {

        "name":"children",

        "type":{

            "type": "array",  

            "items":{

                        "name":"Child",

                        "type":"record",

                        "fields":[

                            {"name":"name", "type":"string"}

                        ]

                    }

            }

    }

}



2. avsc파일 컴파일

C:tmpavro-tools>java -jar avro-tools-1.7.7.jar compile schema emp.avsc .

Input files to compile:

  emp.avsc


* 컴파일 결과 생성파일 : C:tmpavro-toolsw3iicomemp.java


3. serializing/ deserializing

  가. serializing

public void send(Emp event) {

EncoderFactory avroEncoderFactory = EncoderFactory.get();

SpecificDatumWriter<Emp> avroEventWriter = new SpecificDatumWriter<Emp>(Emp.SCHEMA$);

ByteArrayOutputStream stream = new ByteArrayOutputStream();

BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null);

try {

avroEventWriter.write(event, binaryEncoder);

binaryEncoder.flush();

} catch (IOException e) {

e.printStackTrace();

}

IOUtils.closeQuietly(stream);


KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(

TOPIC, stream.toByteArray());


producer.send(data);

}


  나. deserializing

ConsumerConnector consumer = 

Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCount = new HashMap<String, Integer>();

topicCount.put(TOPIC, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(TOPIC);

SpecificDatumReader<Emp> specificDatumReader = new SpecificDatumReader<Emp>(Emp.class);



4. producer 샘플소스

package com.gooper.icbms.sda.test.kafka;


import java.io.IOException;

import java.util.Properties;


import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;


import org.apache.avro.io.BinaryEncoder;

import org.apache.avro.io.EncoderFactory;

import org.apache.avro.specific.SpecificDatumWriter;

import org.apache.commons.io.IOUtils;

import org.apache.commons.io.output.ByteArrayOutputStream;


import com.gooper.icbms.sda.comm.util.Utils;

import com.gooper.icbms.sda.test.kafka.avro.Emp;


public class AvroEmpEmitter {

public Producer<String, byte[]> producer;

private static final String TOPIC = Utils.KafkaTopics.COL_EMP.toString();

/**

* broker주소

*/

private static String BROKER = "gsda1:9092,gsda2:9092,gsda3:9092";


public AvroEmpEmitter(String broker) {

Properties props = new Properties();

props.put("metadata.broker.list", broker);

props.put("serializer.class", "kafka.serializer.DefaultEncoder");

props.put("partitioner.class", "kafka.producer.DefaultPartitioner");

props.put("request.required.acks", "1");

producer = new Producer<String, byte[]>(new ProducerConfig(props));

}

public void send(Emp event) {

EncoderFactory avroEncoderFactory = EncoderFactory.get();

SpecificDatumWriter<Emp> avroEventWriter = new SpecificDatumWriter<Emp>(Emp.SCHEMA$);

ByteArrayOutputStream stream = new ByteArrayOutputStream();

BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null);

try {

avroEventWriter.write(event, binaryEncoder);

binaryEncoder.flush();

} catch (IOException e) {

e.printStackTrace();

}

IOUtils.closeQuietly(stream);


KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(

TOPIC, stream.toByteArray());


producer.send(data);

}

public static void main(String[] args) {

AvroEmpEmitter avroOneM2MEmitter = new AvroEmpEmitter(BROKER);

// Emp전송

System.out.println("Send start(Emp)......................");

avroOneM2MEmitter.send(buildEmp());

System.out.println("Send end(Emp)......................");


// 전송끝 

avroOneM2MEmitter.close();


}

private void close() {

producer.close();

}


private static Emp buildEmp() {

Emp emp = new Emp();

emp.setId(1000);

emp.setName("이벤트명칭");

emp.setSalary(20);

emp.setAge(1);

emp.setAddress("여기는 주소입니다.");

return emp;

}

}


5. consumer 샘플소스

package com.gooper.icbms.sda.test.kafka;


import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;


import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;


import org.apache.avro.io.BinaryDecoder;

import org.apache.avro.io.DecoderFactory;

import org.apache.avro.specific.SpecificDatumReader;


import com.gooper.icbms.sda.comm.util.Utils;

import com.gooper.icbms.sda.test.kafka.avro.Emp;


public class AvroEmpSubscribe {

private static final String TOPIC = Utils.KafkaTopics.COL_EMP.toString();

private static final String ZOOKEEPER_CONNECTION = "gsda1:2181,gsda2:2181,gsda3:2181";

@SuppressWarnings({ "rawtypes", "unchecked" })

public static void main(String[] args) {

Properties properties = new Properties();

properties.put("zookeeper.connect",ZOOKEEPER_CONNECTION);

properties.put("group.id","testgroup_11");

properties.put("zookeeper.session.timeout.ms", "500");

properties.put("zookeeper.sync.time.ms", "250");

   //properties.put("auto.commit.enable", "false"); 

properties.put("auto.commit.enable", "true"); 

                // 아래 값을 너무 짧게 설정하면 처리할 데이타가 많이 몰리는 경우 처리되지 않는 경우가 발생할 수 있음

// default값이 60000임

properties.put("auto.commit.interval.ms", "60000"); 

properties.put("fetch.message.max.bytes", "31457280"); // 30MB  

properties.put("auto.offset.reset", "smallest");

//properties.put("auto.offset.reset", "largest"); // 최근것부터 처리

ConsumerConnector consumer = 

Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCount = new HashMap<String, Integer>();


topicCount.put(TOPIC, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(TOPIC);

SpecificDatumReader<Emp> specificDatumReader = new SpecificDatumReader<Emp>(Emp.class);

for (final KafkaStream stream : streams) {

ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();

while (consumerIte.hasNext()) {

try {

MessageAndMetadata msg = consumerIte.next();

byte[] message = (byte[]) msg.message();

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null);

Emp read = specificDatumReader.read(null, binaryDecoder);

System.out.println("Message from Topic("+TOPIC+") : " + read.toString());

System.out.println("Message(name) from Topic("+TOPIC+") : " + read.getName());

} catch (Exception e) {

e.printStackTrace();

}

}

}

if (consumer != null) consumer.shutdown();

}

}



* 참고 : https://avro.apache.org/docs/1.7.7/gettingstartedjava.html#Defining+a+schema

번호 제목 글쓴이 날짜 조회 수
237 org.apache.hadoop.hbase.ClockOutOfSyncException: org.apache.hadoop.hbase.ClockOutOfSyncException 오류시 조치사항 총관리자 2016.07.14 62
236 kafka로 부터 메세지를 stream으로 받아 처리하는 spark샘플소스(spark의 producer와 consumer를 sbt로 컴파일 하고 서버에서 spark-submit하는 방법) 총관리자 2016.07.13 628
235 [sbt] sbt-assembly를 이용하여 실행에 필요한 모든 j라이브러리를 포함한 fat jar파일 만들기 총관리자 2016.07.11 1736
234 [sbt] sbt 0.13.11 를 windows에 설치하고 scala프로그램을 compile해서 jar파일 만들기 총관리자 2016.07.11 73
» avro 사용하기(avsc 스키마 파일 컴파일 방법, consumer, producer샘플소스) 총관리자 2016.07.08 1264
232 DataSetCreator실행시 "Illegal character in fragment at index"오류가 나는 경우 조치방안 총관리자 2016.06.17 480
231 5건의 triple data를 이용하여 특정 작업 폴더에서 작업하는 방법/절차 총관리자 2016.06.16 36
230 queryTranslator실행시 NullPointerException가 발생전에 java.lang.ArrayIndexOutOfBoundsException발생시 조치사항 총관리자 2016.06.16 58
229 S2RDF를 실행부분만 추출하여 1건의 triple data를 HDFS에 등록, sparql을 sql로 변환, sql실행하는 방법및 S2RDF소스 컴파일 방법 총관리자 2016.06.15 410
228 S2RDF모듈의 실행부분만 추출하여 별도록 실행하는 방법(draft) 총관리자 2016.06.14 36
227 spark-sql실행시 ERROR log: Got exception: java.lang.NumberFormatException For input string: "2000ms" 오류발생시 조치사항 총관리자 2016.06.09 167
226 spark-sql실행시 Caused by: java.lang.NumberFormatException: For input string: "0s" 오류발생시 조치사항 총관리자 2016.06.09 2802
225 spark-sql실행시 The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH오류 발생시 조치사항 총관리자 2016.06.09 403
224 ./spark-sql 실행시 "java.lang.NumberFormatException: For input string: "1s"오류발생시 조치사항 총관리자 2016.06.09 127
223 beeline실행시 User: root is not allowed to impersonate오류 발생시 조치사항 총관리자 2016.06.03 744
222 Caused by: java.net.URISyntaxException: Relative path in absolute URI: ${system:java.io.tmpdir%7D/$%7Bsystem:user.name%7D오류발생시 조치사항 총관리자 2016.06.03 1141
221 impala 설치/설정 총관리자 2016.06.03 1028
220 hive 2.0.1 설치및 mariadb로 metastore 설정 총관리자 2016.06.03 5176
219 Windows에서 sbt개발환경 구축 방법(링크) 총관리자 2016.06.02 54
218 "암은 평범한 병, 심호흡만 잘해도 암세포 분열 저지” 총관리자 2016.06.02 48

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로