메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


1. 다운로드

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz


2. upload및 압축풀기

  가. tar xvfz kafka_2.11-0.9.0.1.tgz

  나. ln -s kafka_2.11-0.9.0.1 kafka


3. properties파일 수정(zookeeper는 별도로 기동중인 상태임)

  가. config/consumer.properties : zookeeper.connect=sda1:2181,sda2:2181,so1:2181

  나. config/producter.properties : metadata.broker.list=sda1:9092,sda2:9092,so1:9092

  다. config/server.properties를 broker개수 만큼 복사하고 아래의 내용을 수정한다.

   (예, broker가 3개인경우 는server-1.properties, server-2.properties, server-3.properties의 

       이름을 각각 가지는 설정파일을 3개 만든다.)

    - broker.id=1 : broker마다 unique한 값

    - port=9092

    - host.name=sda1  : 각 서버의 고정ip

    - log.dirs=/logs/kafka/kafka-logs

    - zookeeper.connect=sda1:2181,sda2:2181,so1:2181


4. broker로 사용할 서버에 각각 복사(sda1, sda2, so1)

  scp -r -P 22 kafka_2.11-0.9.0.1 root@sda2:$HOME

  

5. broker노드로 사용할 각 서버에서 kafka server를 기동한다.(root로 각 서버에서 실행)

  가. ln -s kafka_2.11-0.9.0.1 kafka

  나. bin/kafka-server-start.sh config/server-1.properties &  (해당 서버에서 실행)

      (Kafka Manager모니터링툴을 이용하기 위한 JMX PORT를 설정하기 위해서

        env JMX_PORT=8989 bin/kafka-server-start.sh config/server-1.properties &

       와 같이 실행한다.)


  나-1. bin/kafka-server-start.sh config/server-2.properties &  (해당 서버에서 실행)

       (Kafka Manager모니터링툴을 이용하기 위한 JMX PORT를 설정하기 위해서

        env JMX_PORT=8989 bin/kafka-server-start.sh config/server-2.properties &

       와 같이 실행한다.)

  나-2. bin/kafka-server-start.sh config/server-3.properties &  (해당 서버에서 실행)

       (Kafka Manager모니터링툴을 이용하기 위한 JMX PORT를 설정하기 위해서

        env JMX_PORT=8989 bin/kafka-server-start.sh config/server-3.properties &

       와 같이 실행한다.)


  * 서버중지 : bin/kafka-server-stop.sh

  * 데몬확인(jps -m) : 81589 Kafka config/server-1.properties

 

6. topic관리(토픽명 : test-topic)

 가. 생성

  * bin/kafka-topics.sh --create --zookeeper sda1:2181,sda2:2181,so1:2181 --replication-factor 3 --partitions 3 --topic test-topic


 나. 목록

 * topic목록 확인 : bin/kafka-topics.sh --list --zookeeper sda1:2181,sda2:2181,so1:2181


 다. topic정보

 * topic정보 : bin/kafka-topics.sh --describe --zookeeper sda1:2181,sda2:2181,so1:2181


 라. topic삭제

 * topic삭제 : bin/kafka-topics.sh --delete --zookeeper sda1:2181,sda2:2181,so1:2181 --topic COLLECT_ONEM2M 

 * 지정topic(예, COL_TEST)에 대한 replica와 partition정보 확인 :

     ./kafka-topics.sh --describe  --zookeeper sda1:2181 --topic COL_TEST


 마. topic설정 정보 변경

 * topic 설정변경(partitions의 수를 5로 변경하는 경우) : bin/kafka-topics.sh --alter --zookeeper sda1:2181,sda2:2181,so1:2181 --topic COL_TEST --partitions 5


 바.topic 생성시 주의사항 

    존재하지 않은 topic에 대하여 메시지를 생산하거나 소비할 경우 broker의 설정값에 따라 디폴트 설정으로 topic을 자동생성될 수 있다. (자동생성하지 않도록 하려면 auto.create.topics.enable를 false로 설정한다)


 - broker관련 기본적으로 설정해야 하는 값(Kafka API사용시 conf설정)

   auto.create.topics.enable=true

   num.partitions=1   

   default.replication.factor=1

   delete.topic.enable=false


   #30MB(전송가능량, Kafka API사용시 conf설정)

   message.max.bytes=31457280

   replica.fetch.max.bytes=31457280


8. test

  가. producer기동

    bin/kafka-console-producer.sh --broker-list sda1:9092,sda2:9092,so1:9092 --topic test-topic

  나. consumer기동

    bin/kafka-console-consumer.sh --zookeeper sda1:2181,sda2:2181,so1:2181 --from-beginning --topic test-topic


  * producer의 console에서 텍스트 입력후 consumer에서 data가 보이면 정상적으로 작동되는것임


9. Kafka API를 이용하여 consumer프로그램 만들때 conf는 아래와 같이 설정해준다.

properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);

properties.put("group.id",user_id);

properties.put("zookeeper.session.timeout.ms", "500");

properties.put("zookeeper.sync.time.ms", "250");

properties.put("auto.commit.interval.ms", "1000");

properties.put("fetch.message.max.bytes", "31457280"); // 30MB 전송가능함

properties.put("auto.offset.reset", "smallest");

번호 제목 글쓴이 날짜 조회 수
39 import 혹은 export할때 hive파일의 default 구분자는 --input-fields-terminated-by "x01"와 같이 지정해야함 총관리자 2014.05.20 4244
38 sqoop작업시 hdfs의 개수보다 더많은 값이 중복되어 oracle에 입력되는 경우가 있음 총관리자 2014.09.02 4093
37 다수의 로그 에이전트로 부터 로그를 받아 각각의 파일로 저장하는 방법(interceptor및 multiplexing) 총관리자 2014.04.04 4089
36 sqoop 1.4.4 설치및 테스트 총관리자 2014.04.21 3134
35 kafka broker기동시 brokerId가 달라서 기동에 실패하는 경우 조치방법 총관리자 2016.05.02 2328
34 hadoop 2.6.0에 sqoop2 (1.99.5) server및 client설치 == fail 총관리자 2015.06.11 1770
33 sqoop에서 oracle관련 작업할때 테이블명, 사용자명, DB명은 모두 대문자로 사용할것 총관리자 2014.05.15 1527
32 flume 1.5.2 설치및 테스트(source : file, sink : hdfs) in HA 총관리자 2015.05.21 1415
31 avro 사용하기(avsc 스키마 파일 컴파일 방법, consumer, producer샘플소스) 총관리자 2016.07.08 1267
30 Flume과 Kafka를 사용한 초당 100만개 로그 수집 테스트 file 총관리자 2016.10.31 1020
29 oozie 에서 sqoop action실행 에러 - 컬럼개수 차이 총관리자 2014.07.17 1002
28 동일서버에서 LA와 LC동시에 기동하여 테스트 총관리자 2014.04.01 928
27 sqoop export/import등을 할때 driver를 못찾는 오류가 발생하면... 총관리자 2014.05.15 863
26 source의 type을 spooldir로 하는 경우 해당 경로에 파일이 들어오면 파일단위로 전송함 총관리자 2014.05.20 687
25 sqoop으로 mariadb에 접근해서 hive 테이블로 자동으로 생성하기 총관리자 2018.08.03 670
24 kafka-manager 1.3.3.4 설정및 실행하기 총관리자 2017.03.20 617
23 Kafka Offset Monitor로 kafka 상태 모니터링 하기 file 총관리자 2016.11.08 526
22 java.util.NoSuchElementException발생시 조치 총관리자 2014.08.27 476
21 컬럼및 라인의 구분자를 지정하여 sqoop으로 데이타를 가져오고 hive테이블을 생성하는 명령문 총관리자 2018.08.03 418
» kafka 0.9.0.1 for scala 2.1.1 설치및 테스트 총관리자 2016.05.02 412

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로