메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


KafkaWordCount.scala를 컴파일하여 jar로 만들고 아래중 한가지 방법으로 Consumer를 실행시킬수 있다.
(test-topic은 kafka에 topic으로 생성되어 있어야 하며 group name은 testg-1로 했다)

* 참고1 : msg producer생성 프로그램 실행(별도의 console창에서 아래를 먼저 실행해준다)
/svc/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCountProducer --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar sda1:7077,sda2:7077 test-topic 1 1

참고2 : icbms-assembly-2.0.jar는 KafkaWordCount와 관련 jar파일이 모두 포함된 uber jar파일이고
icbms_2.10-2.0.jar는 관련jar가 포함되지 않은 KafkaWordCount.scala를 compile하여 jar로 만든 파일이다.

------------방법1(--master를 yarn으로 지정하고 --jars 옵션에 ,를 이용하여 필요한 jar를 모두 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar,icbms_2.10-2.0.jar icbms_2.10-2.0.jar  sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3

------------방법2(--master를 yarn으로 지정하고 --jars 옵션과 --files옵션을 이용하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar --files icbms_2.10-2.0.jar icbms_2.10-2.0.jar sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3
 
------------방법3(--master를 local[2]로 지정하고 --jars 옵션을 이용하여 uber jar만 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar  sda1:2181,sda2:2181 testg-1 test-topic 3

------------방법4(--master를 spark 지정하고 --jars 옵션을 이용하여 uber jar만 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master spark://sda1:7077,sda2:7077 --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms-assembly-2.0.jar  sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3


----------------------------샘플소스(KafkaWordCount.scala)---------
package icbms.test

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

// Produces some random words between 1 and 100.
object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}
번호 제목 글쓴이 날짜 조회 수
321 여러 홈페이지를 운영하거나 혹은 서버에 가입한 사용자들에게 홈페이지 계정을 나누어 줄수 있도록 설정/계정 생성방법 총관리자 2018.01.23 282
320 Eclipse 에서 bitbucket.org 연동 하기 file 총관리자 2017.06.08 283
319 Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging 발생시 조치사항 총관리자 2017.04.19 284
318 System Properties Comparison Elasticsearch vs. Hive vs. Jena file 총관리자 2016.03.10 285
317 룰에 매칭되면 발생되는 엑티베이션 객체에 대한 작업(이전값 혹은 현재값)을 처리하는 클래스 파일 총관리자 2016.07.21 285
316 kafka 0.9.0.1버젼의 producer와 kafka버젼이 0.10.0.1인 consumer가 서로 대화하는 모습 총관리자 2016.08.18 288
315 Spark에서 KafkaUtils.createStream()를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 예제 소스 총관리자 2017.04.26 292
314 spark-submit으로 spark application실행하는 다양한 방법 총관리자 2016.05.25 303
313 Runtime.getRuntime().exec(cmd) sample 소스 총관리자 2015.11.19 305
312 cassandra cluster 문제가 있는 node제거 하기(DN상태의 노드가 있으면 cassandra cluster 전체에 문제가 발생하므로 반드시 제거할것) 총관리자 2017.06.21 309
311 Cloudera설치중 실패로 여러번 설치하는 과정에 "Running in non-interactive mode, and data appears to exist in Storage Directory /dfs/nn. Not formatting." 오류가 발생시 조치하는 방법 총관리자 2018.03.29 309
310 [Oozie]Disk I/O error: Failed to open HDFS file dhfs://..../tb_aaa/....OPYING 총관리자 2019.02.15 312
309 editLog의 문제로 발생하는 journalnode 기동 오류 발생시 조치사항 총관리자 2017.09.14 313
308 [Kudu] tablet server 혹은 kudu master가 어떤 원인에 의해서 replica가 failed상태인 경우 복구하는 방법 총관리자 2021.05.24 317
307 [tomcat] logrotate를 이용하여 catalina.out로그파일 일별로 로테이션 저장하기 file 총관리자 2017.01.18 318
306 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable원인 총관리자 2015.04.27 322
305 cloudera-scm-agent 설정파일 위치및 재시작 명령문 총관리자 2018.03.29 322
304 [JSON 파싱]mongodb의 document를 GSON을 이용하여 parsing할때 ObjectId값에서 오류 발생시 조치방법 총관리자 2017.01.18 323
303 Container killed by the ApplicationMaster. Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 TaskAttempt killed because it ran on unusable node 오류시 조치방법 총관리자 2017.04.06 325
302 Cloudera가 사용하는 서비스별 포트 총관리자 2018.03.29 326

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로