메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


0. test-topic은 미리 생성해둔다.
(./bin/kafka-topics.sh --create --zookeeper gsda1:2181,gsda2:2181,gsda3:2181 --replication-factor 3 --partitions 3 --topic test-topic)

1. scala-ide용 eclipse에서 아래의 소스를 편집한다.

2. 해당 프로젝트의 console창에서 "sbt clean assemlby"를 실행하여 fat jar파일을 만든다.(파일명 : icbms-assembly-2.0.jar)

3. 서버에서 producer를 실행한다.(icbms.test.KafkaWordCountProducer)
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCountProducer --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar gsda1:7077,gsda2:7077 test-topic 1 1

4. 서버에서 consumer를 실행한다.(icbms.test.KafkaWordCount)
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar  gsda1:2181,gsda2:2181 testg-1 test-topic 1


* 다양한 실행방법
    (icbms-assembly-2.0.jar은 "sbt assembly"명령으로 만들어지며, icbms_2.10-2.0.jar는 "sbt package"명령으로 만들어진다.)

가. yarn에서 실행(#1) : /svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar,icbms_2.10-2.0.jar icbms_2.10-2.0.jar  gsda1:2181,gsda2:2181 testg-1 test-topic 3

나. yarn에서 실행(#1) : /svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar --files icbms_2.10-2.0.jar icbms_2.10-2.0.jar gsda1:2181,gsda2:2181 testg-1 test-topic 3

다. spark cluster에서 실행
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master spark://gsda1:7077,sda2:7077 --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms-assembly-2.0.jar gsda1:2181,gsda2:2181 testg-1 test-topic 3

라. local모드로 실행
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar gsda1:2181,sda2:2181 testg-1 test-topic 3



-----------------scala소스 빌드용 설정파일(project.sbt) ---------------
import sbtassembly.AssemblyPlugin._

name := "icbms"

version := "2.0"

 //scalaVersion := "2.11.8"
scalaVersion := "2.10.4"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

libraryDependencies ++= Seq(
("org.apache.spark" %% "spark-core" % "1.3.1" % "provided")
.exclude("org.mortbay.jetty", "servlet-api").
    exclude("commons-beanutils", "commons-beanutils-core").
    exclude("commons-collections", "commons-collections").
    exclude("commons-logging", "commons-logging").
    exclude("com.esotericsoftware.minlog", "minlog").
    exclude("com.codahale.metrics", "metrics-core")
,
"org.apache.spark" %% "spark-sql" % "1.3.1" ,
"org.apache.spark" % "spark-streaming_2.10" % "1.3.1",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.3.1" ,
"org.apache.kafka" % "kafka_2.10" % "0.9.0.1" ,
"org.apache.avro" % "avro" % "1.7.7" 
)

assemblyMergeStrategy in assembly := {
    case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
    case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
    case PathList("org", "apache", xs @ _*) => MergeStrategy.last
    case PathList("com", "google", xs @ _*) => MergeStrategy.last
    case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
    case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
    case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
    case "about.html" => MergeStrategy.rename
    case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
    case "META-INF/mailcap" => MergeStrategy.last
    case "META-INF/mimetypes.default" => MergeStrategy.last
    case "plugin.properties" => MergeStrategy.last
    case "log4j.properties" => MergeStrategy.last
    case x =>
        val oldStrategy = (assemblyMergeStrategy in assembly).value
        oldStrategy(x)
}

----------------------소스파일---------------
package icbms.test

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
 *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
 *   <group> is the name of kafka consumer group
 *   <topics> is a list of one or more kafka topics to consume from
 *   <numThreads> is the number of threads the kafka consumer should use
 *
 * Example:
 *    `$ bin/run-example
 *      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03
 *      my-consumer-group topic1,topic2 1`
 */
object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    //sparkConf.setMaster("spark://gsda1:7077,gsda2:7077")
    //sparkConf.setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

// Produces some random words between 1 and 100.
object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}
번호 제목 글쓴이 날짜 조회 수
301 jena/fuseki 3.4.0 설치 총관리자 2017.07.25 172
300 Windows7 64bit 환경에서 Apache Hadoop 2.7.1설치하기 총관리자 2017.07.26 236
299 Windows7 64bit 환경에서 Apache Spark 2.2.0 설치하기 총관리자 2017.07.26 260
298 [oneM2M]Ontologies used for oneM2M 총관리자 2017.08.02 49
297 DeviceType이 o:motion-sensor_33 이거나 o:motion-sensor_32 경우의 sparql문장은 다음과 같다. 총관리자 2017.08.16 40
296 RDF4J의 rdf4j-server.war가 제공하는 RESTFul API를 이용하여 repository에 CRUD테스트 총관리자 2017.08.30 51
295 RDF4J의 rdf4j-server.war가 제공하는 RESTFul API를 이용한 CRUD테스트(트랜잭션처리) 총관리자 2017.08.30 43
294 RDF4J의 RESTFul API처리 클래스 소스 파악(web module위주) 총관리자 2017.08.30 156
293 파일은 남겨두고 파일 내용만 지우고자 할 때. 총관리자 2017.08.30 32
292 hadoop cluster구성된 노드를 확인시 Capacity를 보면 색이 붉은색으로 표시되어 있는 경우나 Unhealthy인 경우 처리방법 총관리자 2017.08.30 46
291 halyard 1.3의 console을 이용하여 100억건의 데이타에 대한 쿼리수행시 ScannerTimeoutException 발생시 조치사항 총관리자 2017.09.06 134
290 editLog의 문제로 발생하는 journalnode 기동 오류 발생시 조치사항 총관리자 2017.09.14 313
289 core 'gc_shard3_replica2' is already locked라는 오류가 발생할때 조치사항 총관리자 2017.09.14 35
288 fuseki에서 제공하는 script중 s-post를 사용하는 예문 총관리자 2017.09.15 34
287 python3.5에서 numpy버젼에 따른 문제점을 조치하는 방법및 pymysql import할때 오류 발생시 조치사항 총관리자 2017.09.28 80
286 Hadoop 2.7.x에서 사용할 수 있는 파일/디렉토리 관련 util성 클래스 파일 총관리자 2017.09.28 76
285 lagom의 online-auction-java프로젝트 실행시 "Could not find Cassandra contact points, due to: ServiceLocator is not bound" 경고 발생시 조치사항 총관리자 2017.10.12 219
284 lagom의 online-auction-java프로젝트 실행시 외부의 kafka/cassandra를 사용하도록 설정하는 방법 총관리자 2017.10.12 211
283 lagom-linux용 build.sbt파일 내용 총관리자 2017.10.12 1300
282 lagom-windows용 build.sbt파일 내용 총관리자 2017.10.12 60

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로