Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.
1 2 3 4 5 6 7 8 9 10 11 | object StreamingRequestCount { def main(args : Array[String]) { val ssc = new StreamingContext( new SparkContext(),Seconds( 2 )) val mystream = ssc.socketTextStream(hostname, port) val userreqs = mystream.map(line = > (line.split( ' ' )( 2 ), 1 )) .reduceByKey((x,y) = > x+y) userreqs.print() ssc.start() ssc.awaitTermination() } } |