메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


0. test-topic은 미리 생성해둔다.
(./bin/kafka-topics.sh --create --zookeeper gsda1:2181,gsda2:2181,gsda3:2181 --replication-factor 3 --partitions 3 --topic test-topic)

1. scala-ide용 eclipse에서 아래의 소스를 편집한다.

2. 해당 프로젝트의 console창에서 "sbt clean assemlby"를 실행하여 fat jar파일을 만든다.(파일명 : icbms-assembly-2.0.jar)

3. 서버에서 producer를 실행한다.(icbms.test.KafkaWordCountProducer)
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCountProducer --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar gsda1:7077,gsda2:7077 test-topic 1 1

4. 서버에서 consumer를 실행한다.(icbms.test.KafkaWordCount)
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar  gsda1:2181,gsda2:2181 testg-1 test-topic 1


* 다양한 실행방법
    (icbms-assembly-2.0.jar은 "sbt assembly"명령으로 만들어지며, icbms_2.10-2.0.jar는 "sbt package"명령으로 만들어진다.)

가. yarn에서 실행(#1) : /svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar,icbms_2.10-2.0.jar icbms_2.10-2.0.jar  gsda1:2181,gsda2:2181 testg-1 test-topic 3

나. yarn에서 실행(#1) : /svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar --files icbms_2.10-2.0.jar icbms_2.10-2.0.jar gsda1:2181,gsda2:2181 testg-1 test-topic 3

다. spark cluster에서 실행
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master spark://gsda1:7077,sda2:7077 --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms-assembly-2.0.jar gsda1:2181,gsda2:2181 testg-1 test-topic 3

라. local모드로 실행
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar gsda1:2181,sda2:2181 testg-1 test-topic 3



-----------------scala소스 빌드용 설정파일(project.sbt) ---------------
import sbtassembly.AssemblyPlugin._

name := "icbms"

version := "2.0"

 //scalaVersion := "2.11.8"
scalaVersion := "2.10.4"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

libraryDependencies ++= Seq(
("org.apache.spark" %% "spark-core" % "1.3.1" % "provided")
.exclude("org.mortbay.jetty", "servlet-api").
    exclude("commons-beanutils", "commons-beanutils-core").
    exclude("commons-collections", "commons-collections").
    exclude("commons-logging", "commons-logging").
    exclude("com.esotericsoftware.minlog", "minlog").
    exclude("com.codahale.metrics", "metrics-core")
,
"org.apache.spark" %% "spark-sql" % "1.3.1" ,
"org.apache.spark" % "spark-streaming_2.10" % "1.3.1",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.3.1" ,
"org.apache.kafka" % "kafka_2.10" % "0.9.0.1" ,
"org.apache.avro" % "avro" % "1.7.7" 
)

assemblyMergeStrategy in assembly := {
    case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
    case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
    case PathList("org", "apache", xs @ _*) => MergeStrategy.last
    case PathList("com", "google", xs @ _*) => MergeStrategy.last
    case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
    case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
    case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
    case "about.html" => MergeStrategy.rename
    case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
    case "META-INF/mailcap" => MergeStrategy.last
    case "META-INF/mimetypes.default" => MergeStrategy.last
    case "plugin.properties" => MergeStrategy.last
    case "log4j.properties" => MergeStrategy.last
    case x =>
        val oldStrategy = (assemblyMergeStrategy in assembly).value
        oldStrategy(x)
}

----------------------소스파일---------------
package icbms.test

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
 *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
 *   <group> is the name of kafka consumer group
 *   <topics> is a list of one or more kafka topics to consume from
 *   <numThreads> is the number of threads the kafka consumer should use
 *
 * Example:
 *    `$ bin/run-example
 *      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03
 *      my-consumer-group topic1,topic2 1`
 */
object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    //sparkConf.setMaster("spark://gsda1:7077,gsda2:7077")
    //sparkConf.setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

// Produces some random words between 1 and 100.
object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}
번호 제목 날짜 조회 수
750 [Shellscript]Impala view의 실제 참조 테이블 추출용 shellscript파일 2025.03.22 1003
749 엑셀에서 K ,M, G ,T 단위를 숫자로 변환 하는 수식 2025.04.09 1264
748 beeline을 이용한 impala JDBC 테스트 방법(Kerberos 설정된 상태임) 2024.11.29 1542
747 외부에서 ImpalaJDBC42.jar를 통해서 Impala에 접속시 sessions정보 2024.11.26 1626
746 test333 2017.05.01 1834
745 http://blog.naver.com... 2017.06.23 1839
744 Failed to resolve 'acme-v02.api.letsencrypt.org' ([Errno -3] Temporary failure in name resolution)" 2024.11.27 1918
743 eclipse 3.1 단축키 정리파일 2017.01.02 2058
742 5건의 triple data를 이용하여 특정 작업 폴더에서 작업하는 방법/절차 2016.06.16 2079
741 [vi] test.nq파일에서 특정문자열(예, <>)을 찾아서 포함되는 라인을 삭제한 동일한 이름의 파일을 만드는 방법 2017.01.25 2079
740 Windows에서 sbt개발환경 구축 방법(링크) 2016.06.02 2087
739 [EncryptionZone]User:testuser not allowed to do "DECRYPT_EEK" on 'testkey' 2023.06.29 2097
738 외부 jar파일을 만들려고하는jar파일의 package로 포함하는 방법 2016.08.10 2105
737 java스레드 덤프 분석하기 file 2016.11.03 2114
736 restaurant-controller,에서 등록 예시 2022.04.30 2124
735 DataSetCreator.py 실행시 파일을 찾을 수 없는 오류 2016.05.27 2129
734 실시간 쿼리 변환 모니터링(팩트내 필드값의 변경사항을 실시간으로 추적함)하는 테스트 java 프로그램 file 2016.07.21 2129
733 [oracle]10자리 timestamp값을 날짜로 변환하는 방법 2022.04.14 2164
732 하둡기반 데이타 모델링(6편) 2018.06.27 2178
731 [메모리 덤프파일 분석] 2017.03.31 2242
위로