메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


KafkaWordCount.scala를 컴파일하여 jar로 만들고 아래중 한가지 방법으로 Consumer를 실행시킬수 있다.
(test-topic은 kafka에 topic으로 생성되어 있어야 하며 group name은 testg-1로 했다)

* 참고1 : msg producer생성 프로그램 실행(별도의 console창에서 아래를 먼저 실행해준다)
/svc/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCountProducer --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar sda1:7077,sda2:7077 test-topic 1 1

참고2 : icbms-assembly-2.0.jar는 KafkaWordCount와 관련 jar파일이 모두 포함된 uber jar파일이고
icbms_2.10-2.0.jar는 관련jar가 포함되지 않은 KafkaWordCount.scala를 compile하여 jar로 만든 파일이다.

------------방법1(--master를 yarn으로 지정하고 --jars 옵션에 ,를 이용하여 필요한 jar를 모두 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar,icbms_2.10-2.0.jar icbms_2.10-2.0.jar  sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3

------------방법2(--master를 yarn으로 지정하고 --jars 옵션과 --files옵션을 이용하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar --files icbms_2.10-2.0.jar icbms_2.10-2.0.jar sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3
 
------------방법3(--master를 local[2]로 지정하고 --jars 옵션을 이용하여 uber jar만 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar  sda1:2181,sda2:2181 testg-1 test-topic 3

------------방법4(--master를 spark 지정하고 --jars 옵션을 이용하여 uber jar만 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master spark://sda1:7077,sda2:7077 --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms-assembly-2.0.jar  sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3


----------------------------샘플소스(KafkaWordCount.scala)---------
package icbms.test

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

// Produces some random words between 1 and 100.
object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}
번호 제목 글쓴이 날짜 조회 수
259 로컬의 라이브러리파일들을 dependency에 포함시키는 방법 총관리자 2016.08.09 49
258 gradle을 이용하여 jar파일 생성시 provided속성을 지정할 수 있게 설정하는 방법 총관리자 2016.08.09 75
257 [SBT] assembly시 "[error] deduplicate: different file contents found in the following:"오류 발생시 조치사항 총관리자 2016.08.04 578
256 [SBT] SBT 사용법 정리(링크) 총관리자 2016.08.04 839
255 [SBT] project.sbt에 libraryDependencies에 필요한 jar를 지정했으나 sbt compile할때 클래스를 못찾는 오류가 발생했을때 조치사항 총관리자 2016.08.03 74
254 build할때 unmappable character for encoding MS949 에러 발생시 조치사항 총관리자 2016.08.03 178
» kafkaWordCount.scala의 producer와 consumer 클래스를 이용하여 kafka를 이용한 word count 테스트 하기 총관리자 2016.08.02 97
252 bin/start-hbase.sh실행시 org.apache.hadoop.hbase.util.FileSystemVersionException: HBase file layout needs to be upgraded오류가 발생하면 조치사항 총관리자 2016.08.01 205
251 start-all.sh로 spark데몬 기동시 "JAVA_HOME is not set"오류 발생시 조치사항 총관리자 2016.08.01 126
250 hadoop클러스터를 구성하던 서버중 HA를 담당하는 서버의 hostname등이 변경되어 문제가 발생했을때 조치사항 총관리자 2016.07.29 362
249 Journal Storage Directory /data/hadoop/journal/data/mycluster not formatted 오류시 조치사항 총관리자 2016.07.29 1518
248 슬라이딩 윈도우 예제 총관리자 2016.07.28 67
247 거침없이 배우는 Drools 책의 샘플소스 file 총관리자 2016.07.22 1232
246 drools를 이용한 로그,rule matching등의 테스트 java프로그램 file 총관리자 2016.07.21 181
245 ServerInfo객체파일 총관리자 2016.07.21 35
244 drools에서 drl관련 로그를 기록하기 위한 클래스 파일 총관리자 2016.07.21 74
243 워킹 메모리에 대한 정보를 처리하는 클래스 파일 총관리자 2016.07.21 49
242 커리 변경 이벤트를 처리하기 위한 구현클래스 총관리자 2016.07.21 41
241 룰에 매칭되면 발생되는 엑티베이션 객체에 대한 작업(이전값 혹은 현재값)을 처리하는 클래스 파일 총관리자 2016.07.21 285
240 실시간 쿼리 변환 모니터링(팩트내 필드값의 변경사항을 실시간으로 추적함)하는 테스트 java 프로그램 file 총관리자 2016.07.21 67

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로