메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


1. 다운로드

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz


2. upload및 압축풀기

  가. tar xvfz kafka_2.11-0.9.0.1.tgz

  나. ln -s kafka_2.11-0.9.0.1 kafka


3. properties파일 수정(zookeeper는 별도로 기동중인 상태임)

  가. config/consumer.properties : zookeeper.connect=sda1:2181,sda2:2181,so1:2181

  나. config/producter.properties : metadata.broker.list=sda1:9092,sda2:9092,so1:9092

  다. config/server.properties를 broker개수 만큼 복사하고 아래의 내용을 수정한다.

   (예, broker가 3개인경우 는server-1.properties, server-2.properties, server-3.properties의 

       이름을 각각 가지는 설정파일을 3개 만든다.)

    - broker.id=1 : broker마다 unique한 값

    - port=9092

    - host.name=sda1  : 각 서버의 고정ip

    - log.dirs=/logs/kafka/kafka-logs

    - zookeeper.connect=sda1:2181,sda2:2181,so1:2181


4. broker로 사용할 서버에 각각 복사(sda1, sda2, so1)

  scp -r -P 22 kafka_2.11-0.9.0.1 root@sda2:$HOME

  

5. broker노드로 사용할 각 서버에서 kafka server를 기동한다.(root로 각 서버에서 실행)

  가. ln -s kafka_2.11-0.9.0.1 kafka

  나. bin/kafka-server-start.sh config/server-1.properties &  (해당 서버에서 실행)

      (Kafka Manager모니터링툴을 이용하기 위한 JMX PORT를 설정하기 위해서

        env JMX_PORT=8989 bin/kafka-server-start.sh config/server-1.properties &

       와 같이 실행한다.)


  나-1. bin/kafka-server-start.sh config/server-2.properties &  (해당 서버에서 실행)

       (Kafka Manager모니터링툴을 이용하기 위한 JMX PORT를 설정하기 위해서

        env JMX_PORT=8989 bin/kafka-server-start.sh config/server-2.properties &

       와 같이 실행한다.)

  나-2. bin/kafka-server-start.sh config/server-3.properties &  (해당 서버에서 실행)

       (Kafka Manager모니터링툴을 이용하기 위한 JMX PORT를 설정하기 위해서

        env JMX_PORT=8989 bin/kafka-server-start.sh config/server-3.properties &

       와 같이 실행한다.)


  * 서버중지 : bin/kafka-server-stop.sh

  * 데몬확인(jps -m) : 81589 Kafka config/server-1.properties

 

6. topic관리(토픽명 : test-topic)

 가. 생성

  * bin/kafka-topics.sh --create --zookeeper sda1:2181,sda2:2181,so1:2181 --replication-factor 3 --partitions 3 --topic test-topic


 나. 목록

 * topic목록 확인 : bin/kafka-topics.sh --list --zookeeper sda1:2181,sda2:2181,so1:2181


 다. topic정보

 * topic정보 : bin/kafka-topics.sh --describe --zookeeper sda1:2181,sda2:2181,so1:2181


 라. topic삭제

 * topic삭제 : bin/kafka-topics.sh --delete --zookeeper sda1:2181,sda2:2181,so1:2181 --topic COLLECT_ONEM2M 

 * 지정topic(예, COL_TEST)에 대한 replica와 partition정보 확인 :

     ./kafka-topics.sh --describe  --zookeeper sda1:2181 --topic COL_TEST


 마. topic설정 정보 변경

 * topic 설정변경(partitions의 수를 5로 변경하는 경우) : bin/kafka-topics.sh --alter --zookeeper sda1:2181,sda2:2181,so1:2181 --topic COL_TEST --partitions 5


 바.topic 생성시 주의사항 

    존재하지 않은 topic에 대하여 메시지를 생산하거나 소비할 경우 broker의 설정값에 따라 디폴트 설정으로 topic을 자동생성될 수 있다. (자동생성하지 않도록 하려면 auto.create.topics.enable를 false로 설정한다)


 - broker관련 기본적으로 설정해야 하는 값(Kafka API사용시 conf설정)

   auto.create.topics.enable=true

   num.partitions=1   

   default.replication.factor=1

   delete.topic.enable=false


   #30MB(전송가능량, Kafka API사용시 conf설정)

   message.max.bytes=31457280

   replica.fetch.max.bytes=31457280


8. test

  가. producer기동

    bin/kafka-console-producer.sh --broker-list sda1:9092,sda2:9092,so1:9092 --topic test-topic

  나. consumer기동

    bin/kafka-console-consumer.sh --zookeeper sda1:2181,sda2:2181,so1:2181 --from-beginning --topic test-topic


  * producer의 console에서 텍스트 입력후 consumer에서 data가 보이면 정상적으로 작동되는것임


9. Kafka API를 이용하여 consumer프로그램 만들때 conf는 아래와 같이 설정해준다.

properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);

properties.put("group.id",user_id);

properties.put("zookeeper.session.timeout.ms", "500");

properties.put("zookeeper.sync.time.ms", "250");

properties.put("auto.commit.interval.ms", "1000");

properties.put("fetch.message.max.bytes", "31457280"); // 30MB 전송가능함

properties.put("auto.offset.reset", "smallest");

번호 제목 글쓴이 날짜 조회 수
480 conda를 이용한 jupyterhub(v0.9)및 jupyter설치 (v4.4.0) 총관리자 2018.07.30 421
479 컬럼및 라인의 구분자를 지정하여 sqoop으로 데이타를 가져오고 hive테이블을 생성하는 명령문 총관리자 2018.08.03 418
478 CDP에서 AD와 Kerberos를 활용하여 인증 환경을 구축하는 3가지 방법 gooper 2022.06.10 416
» kafka 0.9.0.1 for scala 2.1.1 설치및 테스트 총관리자 2016.05.02 412
476 Permission denied: user=hadoop, access=EXECUTE, inode="/tmp":root:supergroup:drwxrwx--- 오류해결방법 총관리자 2015.05.17 412
475 S2RDF를 실행부분만 추출하여 1건의 triple data를 HDFS에 등록, sparql을 sql로 변환, sql실행하는 방법및 S2RDF소스 컴파일 방법 총관리자 2016.06.15 410
474 원보드 컴퓨터 비교표 file 총관리자 2014.08.04 408
473 2개 data를 join하고 마지막으로 code정보를 join하여 결과를 얻는 mr 프로그램 총관리자 2014.06.30 408
472 Job이 끝난 log을 볼수 있도록 설정하기 총관리자 2016.05.30 403
471 Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.http.HttpConfig.getSchemePrefix()Ljava/lang/String; 해결->실패 총관리자 2015.06.14 402
470 [백업] 리눅스 시스템 백업하기 (Linux System Backup) - TAR 사용 시스템 전체 백업 총관리자 2022.01.19 399
469 source, sink를 직접 구현하여 사용하는 예시 총관리자 2019.05.30 398
468 Eclipse실행시 Java was started but returned exit code=1이라는 오류가 발생할때 조치방법 총관리자 2016.11.07 397
467 Cassandra 3.4(3.10) 설치/설정 (5대로 clustering) 총관리자 2016.04.11 397
466 Error: E0501 : E0501: Could not perform authorization operation, User: hadoop is not allowed to impersonate hadoop 해결하는 방법 총관리자 2015.06.07 385
465 hive metadata(hive, impala, kudu 정보가 있음) 테이블에서 db, table, owner, location를 조회하는 쿼리 총관리자 2020.02.07 380
464 sparql 문법구조 설명 file 총관리자 2015.12.09 378
463 특정문자열이나 URI를 임의로 select 절에 지정하여 사용할때 사용하는 sparql 문장 총관리자 2016.08.25 376
462 namenode오류 복구시 사용하는 명령 총관리자 2016.04.01 375
461 scan의 startrow, stoprow지정하는 방법 총관리자 2015.04.08 375

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로