메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


KafkaWordCount.scala를 컴파일하여 jar로 만들고 아래중 한가지 방법으로 Consumer를 실행시킬수 있다.
(test-topic은 kafka에 topic으로 생성되어 있어야 하며 group name은 testg-1로 했다)

* 참고1 : msg producer생성 프로그램 실행(별도의 console창에서 아래를 먼저 실행해준다)
/svc/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCountProducer --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar sda1:7077,sda2:7077 test-topic 1 1

참고2 : icbms-assembly-2.0.jar는 KafkaWordCount와 관련 jar파일이 모두 포함된 uber jar파일이고
icbms_2.10-2.0.jar는 관련jar가 포함되지 않은 KafkaWordCount.scala를 compile하여 jar로 만든 파일이다.

------------방법1(--master를 yarn으로 지정하고 --jars 옵션에 ,를 이용하여 필요한 jar를 모두 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar,icbms_2.10-2.0.jar icbms_2.10-2.0.jar  sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3

------------방법2(--master를 yarn으로 지정하고 --jars 옵션과 --files옵션을 이용하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar --files icbms_2.10-2.0.jar icbms_2.10-2.0.jar sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3
 
------------방법3(--master를 local[2]로 지정하고 --jars 옵션을 이용하여 uber jar만 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar  sda1:2181,sda2:2181 testg-1 test-topic 3

------------방법4(--master를 spark 지정하고 --jars 옵션을 이용하여 uber jar만 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master spark://sda1:7077,sda2:7077 --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms-assembly-2.0.jar  sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3


----------------------------샘플소스(KafkaWordCount.scala)---------
package icbms.test

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

// Produces some random words between 1 and 100.
object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}
번호 제목 글쓴이 날짜 조회 수
740 [CDP7.1.7] oozie sqoop action으로 import혹은 export수행시 발생한 오류에 대한 자세한 로그 확인 하는 방법 gooper 2024.04.19 0
739 [Impala] alter table구문수행시 "WARNINGS: Impala does not have READ_WRITE access to path 'hdfs://nameservice1/DATA/Temp/DB/source/table01_ccd'" 발생시 조치 gooper 2024.04.26 0
738 [CDP7.1.7, Replication]Encryption Zone내 HDFS파일을 비Encryption Zone으로 HDFS Replication시 User hdfs가 아닌 hadoop으로 수행하는 방법 gooper 2024.01.15 1
737 [CDP7.1.7]Hive Replication수행중 Specified catalog.database.table does not exist : hive.db명.table명 오류 발생시 조치방법 gooper 2024.04.05 1
736 [CDP7.1.7][Replication]Table does not match version in getMetastore(). Table view original text mismatch gooper 2024.01.02 2
735 ./gradlew :composeDown 및 ./gradlew :composeUp 를 성공했을때의 메세지 gooper 2023.02.20 6
734 호출 url현황 gooper 2023.02.21 6
733 [vue storefrontui]외부 API통합하기 참고 문서 총관리자 2022.02.09 7
732 [Cloudera Agent] Metadata-Plugin throttling_logger INFO (713 skipped) Unable to send data to nav server. Will try again. gooper 2022.05.16 7
731 [CDP7.1.7, Hive Replication]Hive Replication진행중 "The following columns have types incompatible with the existing columns in their respective positions " 오류 gooper 2023.12.27 7
730 [CDP7.1.7]Oozie job에서 ERROR: Kudu error(s) reported, first error: Timed out: Failed to write batch of 774 ops to tablet 8003f9a064bf4be5890a178439b2ba91가 발생하면서 쿼리가 실패하는 경우 gooper 2024.01.05 7
729 eclipse editor 설정방법 총관리자 2022.02.01 9
728 주문히스토리 조회 총관리자 2022.04.30 10
727 [bitbucket] 2022년 3월 2일 부터 git 작업시 기존에 사용하던 비빌번호를 사용할 수 없도록 변경되었다. 총관리자 2022.04.30 10
726 oozie의 sqoop action수행시 ooize:launcher의 applicationId를 이용하여 oozie:action의 applicationId및 관련 로그를 찾는 방법 gooper 2023.07.26 10
725 주문 생성 데이터 예시 총관리자 2022.04.30 11
724 [EncryptionZone]User:testuser not allowed to do "DECRYPT_EEK" on 'testkey' gooper 2023.06.29 11
723 [CDP7.1.7]Encryption Zone내부/외부 간 데이터 이동(mv,cp)및 CTAS, INSERT SQL시 오류(can't be moved into an encryption zone, can't be moved from an encryption zone) gooper 2023.11.14 11
722 [CDP7.1.7]impala-shell수행시 간헐적으로 "-k requires a valid kerberos ticket but no valid kerberos ticket found." 오류 gooper 2023.11.16 11
721 [Encryption Zone]Encryption Zone에 생성된 table을 select할때 HDFS /tmp/zone1에 대한 권한이 없는 경우 gooper 2023.06.29 12

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로