메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


KafkaWordCount.scala를 컴파일하여 jar로 만들고 아래중 한가지 방법으로 Consumer를 실행시킬수 있다.
(test-topic은 kafka에 topic으로 생성되어 있어야 하며 group name은 testg-1로 했다)

* 참고1 : msg producer생성 프로그램 실행(별도의 console창에서 아래를 먼저 실행해준다)
/svc/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCountProducer --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar sda1:7077,sda2:7077 test-topic 1 1

참고2 : icbms-assembly-2.0.jar는 KafkaWordCount와 관련 jar파일이 모두 포함된 uber jar파일이고
icbms_2.10-2.0.jar는 관련jar가 포함되지 않은 KafkaWordCount.scala를 compile하여 jar로 만든 파일이다.

------------방법1(--master를 yarn으로 지정하고 --jars 옵션에 ,를 이용하여 필요한 jar를 모두 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar,icbms_2.10-2.0.jar icbms_2.10-2.0.jar  sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3

------------방법2(--master를 yarn으로 지정하고 --jars 옵션과 --files옵션을 이용하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar --files icbms_2.10-2.0.jar icbms_2.10-2.0.jar sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3
 
------------방법3(--master를 local[2]로 지정하고 --jars 옵션을 이용하여 uber jar만 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar  sda1:2181,sda2:2181 testg-1 test-topic 3

------------방법4(--master를 spark 지정하고 --jars 옵션을 이용하여 uber jar만 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master spark://sda1:7077,sda2:7077 --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms-assembly-2.0.jar  sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3


----------------------------샘플소스(KafkaWordCount.scala)---------
package icbms.test

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

// Produces some random words between 1 and 100.
object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}
번호 제목 글쓴이 날짜 조회 수
321 VPS에서는 root로 실행해도 swap파일을 만들지 못하게 만들어 두었지만 swap파일을 생성하는 방법 총관리자 2017.06.20 120
320 cassandra cluster 문제가 있는 node제거 하기(DN상태의 노드가 있으면 cassandra cluster 전체에 문제가 발생하므로 반드시 제거할것) 총관리자 2017.06.21 309
319 Not enough replica available for query at consistency QUORUM가 발생하는 경우 총관리자 2017.06.21 256
318 http://blog.naver.com... 총관리자 2017.06.23 88
317 elasticsearch 기동시 permission denied on key 'vm.max_map_count' 오류발생시 조치사항 총관리자 2017.06.23 431
316 solr 6.2에 한글 형태소 분석기(arirang 6.x) 적용 및 테스트 file 총관리자 2017.06.27 881
315 mysql에서 외부 디비를 커넥션할 경우 접속 속도가 느려질때 총관리자 2017.06.30 1083
314 solr명령 실행시 "Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect" 오류발생 총관리자 2017.06.30 202
313 python test.py실행시 "ImportError: No module named pyspark" 혹은 "ImportError: No module named py4j.protocol"등의 오류 발생시 조치사항 총관리자 2017.07.04 766
312 halyard 1.3을 다른 서버로 이전하는 방법 총관리자 2017.07.05 66
311 halyard 1.3의 rdf4j-server.war와 rdf4j-workbench.war를 tomcat deploy후 조회시 java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/Cell발생시 조치사항 총관리자 2017.07.05 65
310 halyard의 console스크립트에서 생성한 repository는 RDF4J Web Applications에서 공유가 되지 않는다. 총관리자 2017.07.05 45
309 schema.xml vs managed-schema 지정 사용하기 - 두개를 동시에 사용할 수는 없음 총관리자 2017.07.09 153
308 HBase write 성능 튜닝 file 총관리자 2017.07.18 87
307 HBase 설정 최적화하기(VCNC) file 총관리자 2017.07.18 120
306 Current heap configuration for MemStore and BlockCache exceeds the threshold required for successful cluster operation 총관리자 2017.07.18 892
305 갑자기 DataNode가 java.io.IOException: Premature EOF from inputStream를 반복적으로 발생시키다가 java.lang.OutOfMemoryError: Java heap space를 내면서 죽는 경우 조치방법 총관리자 2017.07.19 1685
304 9대가 hbase cluster로 구성된 서버에서 테스트 data를 halyard에 적재하고 테스트 하는 방법및 절차 총관리자 2017.07.21 56
303 Core with name 'xx_shard4_replica1' already exists. 발생시 조치사항 총관리자 2017.07.22 62
302 LUBM 데이타 생성구문 총관리자 2017.07.24 143

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로