메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


0. test-topic은 미리 생성해둔다.
(./bin/kafka-topics.sh --create --zookeeper gsda1:2181,gsda2:2181,gsda3:2181 --replication-factor 3 --partitions 3 --topic test-topic)

1. scala-ide용 eclipse에서 아래의 소스를 편집한다.

2. 해당 프로젝트의 console창에서 "sbt clean assemlby"를 실행하여 fat jar파일을 만든다.(파일명 : icbms-assembly-2.0.jar)

3. 서버에서 producer를 실행한다.(icbms.test.KafkaWordCountProducer)
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCountProducer --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar gsda1:7077,gsda2:7077 test-topic 1 1

4. 서버에서 consumer를 실행한다.(icbms.test.KafkaWordCount)
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar  gsda1:2181,gsda2:2181 testg-1 test-topic 1


* 다양한 실행방법
    (icbms-assembly-2.0.jar은 "sbt assembly"명령으로 만들어지며, icbms_2.10-2.0.jar는 "sbt package"명령으로 만들어진다.)

가. yarn에서 실행(#1) : /svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar,icbms_2.10-2.0.jar icbms_2.10-2.0.jar  gsda1:2181,gsda2:2181 testg-1 test-topic 3

나. yarn에서 실행(#1) : /svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar --files icbms_2.10-2.0.jar icbms_2.10-2.0.jar gsda1:2181,gsda2:2181 testg-1 test-topic 3

다. spark cluster에서 실행
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master spark://gsda1:7077,sda2:7077 --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms-assembly-2.0.jar gsda1:2181,gsda2:2181 testg-1 test-topic 3

라. local모드로 실행
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar gsda1:2181,sda2:2181 testg-1 test-topic 3



-----------------scala소스 빌드용 설정파일(project.sbt) ---------------
import sbtassembly.AssemblyPlugin._

name := "icbms"

version := "2.0"

 //scalaVersion := "2.11.8"
scalaVersion := "2.10.4"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

libraryDependencies ++= Seq(
("org.apache.spark" %% "spark-core" % "1.3.1" % "provided")
.exclude("org.mortbay.jetty", "servlet-api").
    exclude("commons-beanutils", "commons-beanutils-core").
    exclude("commons-collections", "commons-collections").
    exclude("commons-logging", "commons-logging").
    exclude("com.esotericsoftware.minlog", "minlog").
    exclude("com.codahale.metrics", "metrics-core")
,
"org.apache.spark" %% "spark-sql" % "1.3.1" ,
"org.apache.spark" % "spark-streaming_2.10" % "1.3.1",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.3.1" ,
"org.apache.kafka" % "kafka_2.10" % "0.9.0.1" ,
"org.apache.avro" % "avro" % "1.7.7" 
)

assemblyMergeStrategy in assembly := {
    case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
    case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
    case PathList("org", "apache", xs @ _*) => MergeStrategy.last
    case PathList("com", "google", xs @ _*) => MergeStrategy.last
    case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
    case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
    case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
    case "about.html" => MergeStrategy.rename
    case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
    case "META-INF/mailcap" => MergeStrategy.last
    case "META-INF/mimetypes.default" => MergeStrategy.last
    case "plugin.properties" => MergeStrategy.last
    case "log4j.properties" => MergeStrategy.last
    case x =>
        val oldStrategy = (assemblyMergeStrategy in assembly).value
        oldStrategy(x)
}

----------------------소스파일---------------
package icbms.test

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
 *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
 *   <group> is the name of kafka consumer group
 *   <topics> is a list of one or more kafka topics to consume from
 *   <numThreads> is the number of threads the kafka consumer should use
 *
 * Example:
 *    `$ bin/run-example
 *      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03
 *      my-consumer-group topic1,topic2 1`
 */
object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    //sparkConf.setMaster("spark://gsda1:7077,gsda2:7077")
    //sparkConf.setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

// Produces some random words between 1 and 100.
object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}
번호 제목 글쓴이 날짜 조회 수
50 hive기동시 Caused by: java.net.URISyntaxException: Relative path in absolute URI: ${system:java.io.tmpdir%7D/$%7Bsystem:user.name%7D 오류 발생시 조치사항 총관리자 2016.09.25 496
49 spark-submit 실행시 "java.lang.OutOfMemoryError: Java heap space"발생시 조치사항 총관리자 2018.02.01 517
48 [Kudu]ERROR: Unable to advance iterator for node with id '2' for Kudu table 'impala::core.pm0_abdasubjct': Network error: recv error from unknown peer: Transport endpoint is not connected (error 107) gooper 2023.03.16 535
47 spark client프로그램 기동시 "Error initializing SparkContext"오류 발생할때 조치사항 총관리자 2016.05.27 539
46 CentOS 7.x에 Jupyter설치 총관리자 2018.04.18 550
45 spark-shell실행시 "A read-only user or a user in a read-only database is not permitted to disable read-only mode on a connection."오류가 발생하는 경우 해결방법 총관리자 2016.05.20 551
44 spark-env.sh에서 사용할 수있는 항목. 총관리자 2016.05.24 567
» kafka로 부터 메세지를 stream으로 받아 처리하는 spark샘플소스(spark의 producer와 consumer를 sbt로 컴파일 하고 서버에서 spark-submit하는 방법) 총관리자 2016.07.13 630
42 hive테이블의 물리적인 위치인 HDFS에 여러개의 데이터 파일이 존재할때 한개의 파일로 merge하여 동일한 테이블에 입력하는 방법 총관리자 2019.05.23 640
41 lateral view 예제 총관리자 2014.09.18 691
40 [Impala 3.2버젼]compute incremental stats db명.테이블명 수행시 ERROR: AnalysisException: Incremental stats size estimate exceeds 2000.00MB. 오류 발생원인및 조치방안 gooper 2022.11.30 697
39 hive metastore ERD file 총관리자 2018.09.20 729
38 beeline실행시 User: root is not allowed to impersonate오류 발생시 조치사항 총관리자 2016.06.03 760
37 oracle to hive data type정리표 총관리자 2018.08.22 764
36 hive query에서 mapreduce돌리지 않고 select하는 방법 총관리자 2014.05.23 810
35 dual table만들기 총관리자 2014.05.16 824
34 Hive JDBC Connection과 유형별 에러및 필요한 jar파일 총관리자 2021.05.24 829
33 unique한 값 생성 총관리자 2014.04.25 888
32 json으로 존재하는 데이터 parsing하기 총관리자 2019.03.25 973
31 hive에서 insert overwrite directory.. 로 하면 default column구분자는 'SOH'혹은 't'가 됨 총관리자 2014.05.20 999

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로