메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


0. test-topic은 미리 생성해둔다.
(./bin/kafka-topics.sh --create --zookeeper gsda1:2181,gsda2:2181,gsda3:2181 --replication-factor 3 --partitions 3 --topic test-topic)

1. scala-ide용 eclipse에서 아래의 소스를 편집한다.

2. 해당 프로젝트의 console창에서 "sbt clean assemlby"를 실행하여 fat jar파일을 만든다.(파일명 : icbms-assembly-2.0.jar)

3. 서버에서 producer를 실행한다.(icbms.test.KafkaWordCountProducer)
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCountProducer --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar gsda1:7077,gsda2:7077 test-topic 1 1

4. 서버에서 consumer를 실행한다.(icbms.test.KafkaWordCount)
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar  gsda1:2181,gsda2:2181 testg-1 test-topic 1


* 다양한 실행방법
    (icbms-assembly-2.0.jar은 "sbt assembly"명령으로 만들어지며, icbms_2.10-2.0.jar는 "sbt package"명령으로 만들어진다.)

가. yarn에서 실행(#1) : /svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar,icbms_2.10-2.0.jar icbms_2.10-2.0.jar  gsda1:2181,gsda2:2181 testg-1 test-topic 3

나. yarn에서 실행(#1) : /svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar --files icbms_2.10-2.0.jar icbms_2.10-2.0.jar gsda1:2181,gsda2:2181 testg-1 test-topic 3

다. spark cluster에서 실행
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master spark://gsda1:7077,sda2:7077 --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms-assembly-2.0.jar gsda1:2181,gsda2:2181 testg-1 test-topic 3

라. local모드로 실행
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar gsda1:2181,sda2:2181 testg-1 test-topic 3



-----------------scala소스 빌드용 설정파일(project.sbt) ---------------
import sbtassembly.AssemblyPlugin._

name := "icbms"

version := "2.0"

 //scalaVersion := "2.11.8"
scalaVersion := "2.10.4"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

libraryDependencies ++= Seq(
("org.apache.spark" %% "spark-core" % "1.3.1" % "provided")
.exclude("org.mortbay.jetty", "servlet-api").
    exclude("commons-beanutils", "commons-beanutils-core").
    exclude("commons-collections", "commons-collections").
    exclude("commons-logging", "commons-logging").
    exclude("com.esotericsoftware.minlog", "minlog").
    exclude("com.codahale.metrics", "metrics-core")
,
"org.apache.spark" %% "spark-sql" % "1.3.1" ,
"org.apache.spark" % "spark-streaming_2.10" % "1.3.1",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.3.1" ,
"org.apache.kafka" % "kafka_2.10" % "0.9.0.1" ,
"org.apache.avro" % "avro" % "1.7.7" 
)

assemblyMergeStrategy in assembly := {
    case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
    case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
    case PathList("org", "apache", xs @ _*) => MergeStrategy.last
    case PathList("com", "google", xs @ _*) => MergeStrategy.last
    case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
    case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
    case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
    case "about.html" => MergeStrategy.rename
    case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
    case "META-INF/mailcap" => MergeStrategy.last
    case "META-INF/mimetypes.default" => MergeStrategy.last
    case "plugin.properties" => MergeStrategy.last
    case "log4j.properties" => MergeStrategy.last
    case x =>
        val oldStrategy = (assemblyMergeStrategy in assembly).value
        oldStrategy(x)
}

----------------------소스파일---------------
package icbms.test

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
 *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
 *   <group> is the name of kafka consumer group
 *   <topics> is a list of one or more kafka topics to consume from
 *   <numThreads> is the number of threads the kafka consumer should use
 *
 * Example:
 *    `$ bin/run-example
 *      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03
 *      my-consumer-group topic1,topic2 1`
 */
object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    //sparkConf.setMaster("spark://gsda1:7077,gsda2:7077")
    //sparkConf.setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

// Produces some random words between 1 and 100.
object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}
번호 제목 날짜 조회 수
105 hive metadata(hive, impala, kudu 정보가 있음) 테이블에서 db, table, owner, location를 조회하는 쿼리 2020.02.07 4709
104 hive테이블의 물리적인 위치인 HDFS에 여러개의 데이터 파일이 존재할때 한개의 파일로 merge하여 동일한 테이블에 입력하는 방법 2019.05.23 4702
103 spark client프로그램 기동시 "Error initializing SparkContext"오류 발생할때 조치사항 2016.05.27 4674
102 Hive java connection 설정 file 2013.04.01 4661
101 [TLS/SSL]Cloudera CDH6.3.4기준 Hue TLS설정 항목 2022.05.13 4658
100 hiverserver2기동시 connection refused가 발생하는 경우 조치방법 2014.05.22 4646
99 kudu 테이블 metadata강제 삭제시 발생하는 오류 메세지 2022.01.12 4636
98 hive에서 생성된 external table에서 hbase의 table에 값 insert하기 2014.04.11 4632
97 json serde사용법 2014.04.17 4625
96 Tracking URL = N/A 가발생하는 경우 - 환경설정값을 잘못설정하는 경우에 발생함 2015.06.17 4618
95 service name방식의 oracle을 메타정보 저장소로 사용할때 Hue Configuration설정하는 방법 2022.02.12 4610
94 you are accessing a non-optimized hue please switch to one of the available addresses 2021.10.06 4607
93 FAILED: IllegalStateException Variable substitution depth too large: 40 오류발생시 조치사항 2014.08.19 4593
92 unique한 값 생성 2014.04.25 4585
91 [TLS]TLS용 사설 인증서 변경 혹은 신규 지정시 No trusted certificate found 오류 발생시 확인및 조치사항 2022.03.15 4579
90 kudu table와 impala(hive) table정보가 틀어져서 테이블을 읽지 못하는 경우(Error Loading Metadata) 조치방법 2023.11.10 4567
89 "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources"오류 발생시 조치사항 2016.05.25 4563
88 [Cloudera 6.3.4, Kudu]]Service Monitor에서 사용하는 metric중에 일부를 blacklist로 설정하여 모니터링 정보 수집 제외하는 방법 2022.07.08 4563
87 Spark 1.6.1 설치후 HA구성 2016.05.24 4533
86 sequence한 번호 생성방법 2014.04.25 4525
위로