메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


1. 로그를 받을 agent설정 파일(flume-conf.properties)

agent.sources = avroGenSrc
agent.channels = memoryChannel
agent.sinks = fileSink

# For each one of the sources, the type is defined
agent.sources.avroGenSrc.type = avro
agent.sources.avroGenSrc.bind = localhost
agent.sources.avroGenSrc.port = 3333

# The channel can be defined as follows.
agent.sources.avroGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.fileSink.type = file_roll
agent.sinks.fileSink.sink.directory=/home/hadoop/saved_data
agent.sinks.fileSink.sink.rollInterval = 10
agent.sinks.fileSink.sink.batchSize = 10

#Specify the channel the sink should use
agent.sinks.fileSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transctionCapacity = 10000


2. 로그를 전송하는 agent(flume-conf-agent01.properties)


agent01.sources = execGenSrc
agent01.channels = memoryChannel
agent01.sinks = avroSink

# For each one of the sources, the type is defined
agent01.sources.execGenSrc.type = exec
agent01.sources.execGenSrc.command = tail -f /home/hadoop/log_data/log1.log
agent01.sources.execGenSrc.batchSize = 10

# The channel can be defined as follows.
agent01.sources.execGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent01.sinks.avroSink.type = avro
agent01.sinks.avroSink.hostname=localhost
agent01.sinks.avroSink.port=3333
agent01.sinks.avroSink.batch-size = 10

#Specify the channel the sink should use
agent01.sinks.avroSink.channel = memoryChannel

# Each channel's type is defined.
agent01.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent01.channels.memoryChannel.capacity = 10000
agent01.channels.memoryChannel.transctionCapacity = 10000


3. log를 받을 폴더 생성및 전송할 데이타 생성

   가. 받을 폴더 : mkdir /home/hadoop/flume/saved_data

   나. 보낼 폴더 : mkdir /home/hadoop/flume/log_data

   다. 보낼파일 :  touch /home/hadoop/flume/log1.log

   라. 폴더로 이동 : cd /home/hadoop/flume

   마. log1.log에 간단하게 로그 추가 : echo "aaaaabbbbbcccc" >> log1.log

 

4. agent실행

  가. 로그를 받을 agent : flume-ng agent --conf-file ./conf/flume-conf.properties --name agent

  나. 로그를 전송할 agent : flume-ng agent --conf-file ./conf/flume-conf-agent01.properties --name agent01

번호 제목 날짜 조회 수
39 Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.http.HttpConfig.getSchemePrefix()Ljava/lang/String; 해결->실패 2015.06.14 2613
38 down된 broker로 메세지를 전송하려는 경우의 오류 내용및 조치사항 2016.08.12 2732
37 kafka 0.9.0.1버젼의 producer와 kafka버젼이 0.10.0.1인 consumer가 서로 대화하는 모습 2016.08.18 3075
36 [CDP7.1.7] oozie sqoop action으로 import혹은 export수행시 발생한 오류에 대한 자세한 로그 확인 하는 방법 2024.04.19 3146
35 oozie 에서 sqoop action실행 에러 - 컬럼개수 차이 2014.07.17 3440
34 oracle 접속 방식에 따른 --connect 지정 방법 2022.02.11 3452
33 kafkaWordCount.scala의 producer와 consumer 클래스를 이용하여 kafka를 이용한 word count 테스트 하기 2016.08.02 3561
32 producer / consumer구현시 설정 옵션 설명 2016.10.19 3621
31 oracle 12에 sqoop해서 데이터 import하기 (console에서 sqoop import하는 방법) 2021.12.31 3759
30 kafka에서 메세지 중복 consume이 발생할 수 있는 상황 2018.10.23 3801
29 kafka의 re-balance를 이용하여 consumer를 multi thread로 돌려서 topic의 partitions을 활용 2015.03.31 4014
28 컬럼및 라인의 구분자를 지정하여 sqoop으로 데이타를 가져오고 hive테이블을 생성하는 명령문 2018.08.03 4035
27 kafka 0.9.0.1 for scala 2.1.1 설치및 테스트 2016.05.02 4051
26 sqoop에서 oracle관련 작업할때 테이블명, 사용자명, DB명은 모두 대문자로 사용할것 2014.05.15 4088
25 source, sink를 직접 구현하여 사용하는 예시 2019.05.30 4128
24 kerberos설정된 상태의 spooldir->memory->hdfs로 저장하는 과정의 flume agent configuration구성 예시 2019.05.30 4147
23 운영중인 상태에서 kafka topic삭제하고 재생성하여 처리되지 않은 메세지 모두 삭제하기 2016.10.24 4194
22 Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스 2017.04.26 4211
21 No broker partitions consumed by consumer thread오류 발생시 확인/조치할 사항 2016.09.02 4214
20 java.util.NoSuchElementException발생시 조치 2014.08.27 4229
위로