메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


KafkaWordCount.scala를 컴파일하여 jar로 만들고 아래중 한가지 방법으로 Consumer를 실행시킬수 있다.
(test-topic은 kafka에 topic으로 생성되어 있어야 하며 group name은 testg-1로 했다)

* 참고1 : msg producer생성 프로그램 실행(별도의 console창에서 아래를 먼저 실행해준다)
/svc/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCountProducer --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar sda1:7077,sda2:7077 test-topic 1 1

참고2 : icbms-assembly-2.0.jar는 KafkaWordCount와 관련 jar파일이 모두 포함된 uber jar파일이고
icbms_2.10-2.0.jar는 관련jar가 포함되지 않은 KafkaWordCount.scala를 compile하여 jar로 만든 파일이다.

------------방법1(--master를 yarn으로 지정하고 --jars 옵션에 ,를 이용하여 필요한 jar를 모두 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar,icbms_2.10-2.0.jar icbms_2.10-2.0.jar  sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3

------------방법2(--master를 yarn으로 지정하고 --jars 옵션과 --files옵션을 이용하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar --files icbms_2.10-2.0.jar icbms_2.10-2.0.jar sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3
 
------------방법3(--master를 local[2]로 지정하고 --jars 옵션을 이용하여 uber jar만 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar  sda1:2181,sda2:2181 testg-1 test-topic 3

------------방법4(--master를 spark 지정하고 --jars 옵션을 이용하여 uber jar만 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master spark://sda1:7077,sda2:7077 --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms-assembly-2.0.jar  sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3


----------------------------샘플소스(KafkaWordCount.scala)---------
package icbms.test

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

// Produces some random words between 1 and 100.
object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}
번호 제목 글쓴이 날짜 조회 수
317 특정 단계의 commit상태로 만들기(이렇게 하면 중간에 반영된 모든 commit를 history가 삭제된다) 총관리자 2016.11.17 42
316 git 초기화(Windows에서 Git Bash사용) 총관리자 2016.11.17 194
315 spark notebook 0.7.0설치및 설정 총관리자 2016.11.14 160
314 참고할만한 spark예제를 설명하는 사이트 총관리자 2016.11.11 98
313 Kafka Offset Monitor로 kafka 상태 모니터링 하기 file 총관리자 2016.11.08 524
312 Eclipse실행시 Java was started but returned exit code=1이라는 오류가 발생할때 조치방법 총관리자 2016.11.07 397
311 [SparkR]SparkR 설치 사용기 1 - Installation Guide On Yarn Cluster & Mesos Cluster & Stand Alone Cluster file 총관리자 2016.11.04 106
310 데이타 분석및 머신러닝에 도움이 도움이 되는 사이트 총관리자 2016.11.04 64
309 java스레드 덤프 분석하기 file 총관리자 2016.11.03 111
308 centos 6에서 mariadb 5.1 to 10.0 으로 upgrade 총관리자 2016.11.01 106
307 Spark Streaming 코드레벨단에서의 성능개선 총관리자 2016.10.31 44
306 Flume과 Kafka를 사용한 초당 100만개 로그 수집 테스트 file 총관리자 2016.10.31 1017
305 Flume을 이용한 데이타 수집시 HBase write 성능 튜닝 file 총관리자 2016.10.31 620
304 How-to: Build a Complex Event Processing App on Apache Spark and Drools file 총관리자 2016.10.31 253
303 How-to: Tune Your Apache Spark Jobs (Part 2) file 총관리자 2016.10.31 77
302 mybatis와 spring을 org.apache.commons.dbcp2.BasicDataSource의 DataSource로 연동할때 DB설정(참고) 총관리자 2016.10.31 990
301 Caused by: java.sql.SQLNonTransientConnectionException: Could not read resultset: unexpected end of stream, read 0 bytes from 4 오류시 확인/조치할 내용 총관리자 2016.10.31 3700
300 VisualVM 1.3.9을 이용한 spark-submit JVM 모니터링을 위한 설정및 spark-submit실행 옵션 총관리자 2016.10.28 1883
299 VisualVM 1.3.9을 이용한 JVM 모니터링 file 총관리자 2016.10.27 329
298 운영중인 상태에서 kafka topic삭제하고 재생성하여 처리되지 않은 메세지 모두 삭제하기 총관리자 2016.10.24 156

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로