메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


1. flume설치파일 다운로드

apache-flume-1.5.2-bin.tar.gz


2. 압축풀기

tar xvfz apache-flume-1.5.2-bin.tar.gz


3. 링크생성

ln -s apache-flume-1.5.2-bin flume


4. 환경변수 수정(vi /home/hadoop/.bashrc)

export FLUME_HOME=/hadoop/flume

export PATH=$PATH:$FLUME_HOME/bin

* 변경사항 반영 : source /home/hadoop/.bashrc


5. Flume Conf

  cd $FLUME_HOME/conf

  cp flume-conf.properties.template flume.conf

  vi flume.conf


agent.sources = seqGenSrc

agent.channels = memoryChannel

agent.sinks = hdfsSink


# For each one of the sources, the type is defined

agent.sources.seqGenSrc.type = exec

agent.sources.seqGenSrc.command = tail -F /home/bigdata/hadoop-1.2.1/logs/hadoop-hadoop-namenode-localhost.localdomain.log

#가상분산환경에서 테스트용으로 잡은것.


# The channel can be defined as follows.

agent.sources.seqGenSrc.channels = memoryChannel


# Each sink's type must be defined

agent.sinks.hdfsSink.type = hdfs

agent.sinks.hdfsSink.hdfs.path = hdfs://mycluster/flume/data     #테스트용

agent.sinks.hdfsSink.rollInterval = 30

agent.sinks.hdfsSink.sink.batchSize = 100


#Specify the channel the sink should use

agent.sinks.hdfsSink.channel = memoryChannel


# Each channel's type is defined.

agent.channels.memoryChannel.type = memory


# Other config values specific to each type of channel(sink or source)

# can be defined as well

# In this case, it specifies the capacity of the memory channel

agent.channels.memoryChannel.capacity = 100000

agent.channels.memoryChannel.transactionCapacity = 10000


6. agent기동

[hadoop@master]$ flume-ng agent -conf-file ./flume.conf --name agent

Info: Including Hadoop libraries found via (/usr/local/flume/bin/hadoop) for HDFS access

Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-api-1.7.5.jar from classpath

Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-log4j12-1.7.5.jar from classpath

Info: Including HBASE libraries found via (/usr/local/hbase/bin/hbase) for HBASE access

Info: Excluding /usr/local/hbase/lib/slf4j-api-1.6.4.jar from classpath

Info: Excluding /usr/local/hbase/lib/slf4j-log4j12-1.6.4.jar from classpath

Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-api-1.7.5.jar from classpath

Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-log4j12-1.7.5.jar from classpath

.....

15/05/21 17:38:57 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting

15/05/21 17:38:57 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:./flume.conf

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Added sinks: hdfsSink Agent: agent

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]

15/05/21 17:38:57 INFO node.AbstractConfigurationProvider: Creating channels

15/05/21 17:38:57 INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory

15/05/21 17:38:57 INFO node.AbstractConfigurationProvider: Created channel memoryChannel

15/05/21 17:38:57 INFO source.DefaultSourceFactory: Creating instance of source seqGenSrc, type exec

15/05/21 17:38:57 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfsSink, type: hdfs

15/05/21 17:38:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

15/05/21 17:38:58 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false

15/05/21 17:38:58 INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [seqGenSrc, hdfsSink]

15/05/21 17:38:58 INFO node.Application: Starting new configuration:{ sourceRunners:{seqGenSrc=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:seqGenSrc,state:IDLE} }} sinkRunners:{hdfsSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@3b201837 counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }

15/05/21 17:38:58 INFO node.Application: Starting Channel memoryChannel

15/05/21 17:38:58 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.

15/05/21 17:38:58 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started

15/05/21 17:38:58 INFO node.Application: Starting Sink hdfsSink

15/05/21 17:38:58 INFO node.Application: Starting Source seqGenSrc


7. hdfs확인

cat aaa >> /home/hadoop/test.log로 데이타를 넣고 아래 명령으로 확인해본다.


[hadoop@master]$ hadoop fs -lsr /flume

drwxr-xr-x   - hadoop supergroup          0 2015-05-21 17:39 /flume/data

-rw-r--r--   3 hadoop supergroup        208 2015-05-21 17:39 /flume/data/FlumeData.1432197542415

번호 제목 날짜 조회 수
39 Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.http.HttpConfig.getSchemePrefix()Ljava/lang/String; 해결->실패 2015.06.14 2614
38 down된 broker로 메세지를 전송하려는 경우의 오류 내용및 조치사항 2016.08.12 2732
37 [CDP7.1.7] oozie sqoop action으로 import혹은 export수행시 발생한 오류에 대한 자세한 로그 확인 하는 방법 2024.04.19 3148
36 kafka 0.9.0.1버젼의 producer와 kafka버젼이 0.10.0.1인 consumer가 서로 대화하는 모습 2016.08.18 3262
35 oozie 에서 sqoop action실행 에러 - 컬럼개수 차이 2014.07.17 3441
34 oracle 접속 방식에 따른 --connect 지정 방법 2022.02.11 3452
33 kafkaWordCount.scala의 producer와 consumer 클래스를 이용하여 kafka를 이용한 word count 테스트 하기 2016.08.02 3561
32 producer / consumer구현시 설정 옵션 설명 2016.10.19 3621
31 oracle 12에 sqoop해서 데이터 import하기 (console에서 sqoop import하는 방법) 2021.12.31 3759
30 kafka에서 메세지 중복 consume이 발생할 수 있는 상황 2018.10.23 3914
29 kafka의 re-balance를 이용하여 consumer를 multi thread로 돌려서 topic의 partitions을 활용 2015.03.31 4015
28 컬럼및 라인의 구분자를 지정하여 sqoop으로 데이타를 가져오고 hive테이블을 생성하는 명령문 2018.08.03 4048
27 kafka 0.9.0.1 for scala 2.1.1 설치및 테스트 2016.05.02 4052
26 sqoop에서 oracle관련 작업할때 테이블명, 사용자명, DB명은 모두 대문자로 사용할것 2014.05.15 4089
25 source, sink를 직접 구현하여 사용하는 예시 2019.05.30 4129
24 kerberos설정된 상태의 spooldir->memory->hdfs로 저장하는 과정의 flume agent configuration구성 예시 2019.05.30 4187
23 Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스 2017.04.26 4211
22 java.util.NoSuchElementException발생시 조치 2014.08.27 4231
21 운영중인 상태에서 kafka topic삭제하고 재생성하여 처리되지 않은 메세지 모두 삭제하기 2016.10.24 4241
20 [sap] Error: java.io.IOException: SQLException in nextKeyValue 오류 발생 2020.06.08 4246
위로