메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


1. flume설치파일 다운로드

apache-flume-1.5.2-bin.tar.gz


2. 압축풀기

tar xvfz apache-flume-1.5.2-bin.tar.gz


3. 링크생성

ln -s apache-flume-1.5.2-bin flume


4. 환경변수 수정(vi /home/hadoop/.bashrc)

export FLUME_HOME=/hadoop/flume

export PATH=$PATH:$FLUME_HOME/bin

* 변경사항 반영 : source /home/hadoop/.bashrc


5. Flume Conf

  cd $FLUME_HOME/conf

  cp flume-conf.properties.template flume.conf

  vi flume.conf


agent.sources = seqGenSrc

agent.channels = memoryChannel

agent.sinks = hdfsSink


# For each one of the sources, the type is defined

agent.sources.seqGenSrc.type = exec

agent.sources.seqGenSrc.command = tail -F /home/bigdata/hadoop-1.2.1/logs/hadoop-hadoop-namenode-localhost.localdomain.log

#가상분산환경에서 테스트용으로 잡은것.


# The channel can be defined as follows.

agent.sources.seqGenSrc.channels = memoryChannel


# Each sink's type must be defined

agent.sinks.hdfsSink.type = hdfs

agent.sinks.hdfsSink.hdfs.path = hdfs://mycluster/flume/data     #테스트용

agent.sinks.hdfsSink.rollInterval = 30

agent.sinks.hdfsSink.sink.batchSize = 100


#Specify the channel the sink should use

agent.sinks.hdfsSink.channel = memoryChannel


# Each channel's type is defined.

agent.channels.memoryChannel.type = memory


# Other config values specific to each type of channel(sink or source)

# can be defined as well

# In this case, it specifies the capacity of the memory channel

agent.channels.memoryChannel.capacity = 100000

agent.channels.memoryChannel.transactionCapacity = 10000


6. agent기동

[hadoop@master]$ flume-ng agent -conf-file ./flume.conf --name agent

Info: Including Hadoop libraries found via (/usr/local/flume/bin/hadoop) for HDFS access

Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-api-1.7.5.jar from classpath

Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-log4j12-1.7.5.jar from classpath

Info: Including HBASE libraries found via (/usr/local/hbase/bin/hbase) for HBASE access

Info: Excluding /usr/local/hbase/lib/slf4j-api-1.6.4.jar from classpath

Info: Excluding /usr/local/hbase/lib/slf4j-log4j12-1.6.4.jar from classpath

Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-api-1.7.5.jar from classpath

Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-log4j12-1.7.5.jar from classpath

.....

15/05/21 17:38:57 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting

15/05/21 17:38:57 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:./flume.conf

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Added sinks: hdfsSink Agent: agent

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]

15/05/21 17:38:57 INFO node.AbstractConfigurationProvider: Creating channels

15/05/21 17:38:57 INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory

15/05/21 17:38:57 INFO node.AbstractConfigurationProvider: Created channel memoryChannel

15/05/21 17:38:57 INFO source.DefaultSourceFactory: Creating instance of source seqGenSrc, type exec

15/05/21 17:38:57 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfsSink, type: hdfs

15/05/21 17:38:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

15/05/21 17:38:58 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false

15/05/21 17:38:58 INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [seqGenSrc, hdfsSink]

15/05/21 17:38:58 INFO node.Application: Starting new configuration:{ sourceRunners:{seqGenSrc=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:seqGenSrc,state:IDLE} }} sinkRunners:{hdfsSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@3b201837 counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }

15/05/21 17:38:58 INFO node.Application: Starting Channel memoryChannel

15/05/21 17:38:58 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.

15/05/21 17:38:58 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started

15/05/21 17:38:58 INFO node.Application: Starting Sink hdfsSink

15/05/21 17:38:58 INFO node.Application: Starting Source seqGenSrc


7. hdfs확인

cat aaa >> /home/hadoop/test.log로 데이타를 넣고 아래 명령으로 확인해본다.


[hadoop@master]$ hadoop fs -lsr /flume

drwxr-xr-x   - hadoop supergroup          0 2015-05-21 17:39 /flume/data

-rw-r--r--   3 hadoop supergroup        208 2015-05-21 17:39 /flume/data/FlumeData.1432197542415

번호 제목 글쓴이 날짜 조회 수
39 import 혹은 export할때 hive파일의 default 구분자는 --input-fields-terminated-by "x01"와 같이 지정해야함 총관리자 2014.05.20 4244
38 sqoop작업시 hdfs의 개수보다 더많은 값이 중복되어 oracle에 입력되는 경우가 있음 총관리자 2014.09.02 4093
37 다수의 로그 에이전트로 부터 로그를 받아 각각의 파일로 저장하는 방법(interceptor및 multiplexing) 총관리자 2014.04.04 4089
36 sqoop 1.4.4 설치및 테스트 총관리자 2014.04.21 3134
35 kafka broker기동시 brokerId가 달라서 기동에 실패하는 경우 조치방법 총관리자 2016.05.02 2331
34 hadoop 2.6.0에 sqoop2 (1.99.5) server및 client설치 == fail 총관리자 2015.06.11 1770
33 sqoop에서 oracle관련 작업할때 테이블명, 사용자명, DB명은 모두 대문자로 사용할것 총관리자 2014.05.15 1527
» flume 1.5.2 설치및 테스트(source : file, sink : hdfs) in HA 총관리자 2015.05.21 1415
31 avro 사용하기(avsc 스키마 파일 컴파일 방법, consumer, producer샘플소스) 총관리자 2016.07.08 1269
30 Flume과 Kafka를 사용한 초당 100만개 로그 수집 테스트 file 총관리자 2016.10.31 1021
29 oozie 에서 sqoop action실행 에러 - 컬럼개수 차이 총관리자 2014.07.17 1002
28 동일서버에서 LA와 LC동시에 기동하여 테스트 총관리자 2014.04.01 928
27 sqoop export/import등을 할때 driver를 못찾는 오류가 발생하면... 총관리자 2014.05.15 863
26 source의 type을 spooldir로 하는 경우 해당 경로에 파일이 들어오면 파일단위로 전송함 총관리자 2014.05.20 687
25 sqoop으로 mariadb에 접근해서 hive 테이블로 자동으로 생성하기 총관리자 2018.08.03 670
24 kafka-manager 1.3.3.4 설정및 실행하기 총관리자 2017.03.20 617
23 Kafka Offset Monitor로 kafka 상태 모니터링 하기 file 총관리자 2016.11.08 526
22 java.util.NoSuchElementException발생시 조치 총관리자 2014.08.27 476
21 컬럼및 라인의 구분자를 지정하여 sqoop으로 데이타를 가져오고 hive테이블을 생성하는 명령문 총관리자 2018.08.03 418
20 kafka 0.9.0.1 for scala 2.1.1 설치및 테스트 총관리자 2016.05.02 412

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로