메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


* 출처 : http://kysepark.blogspot.kr/2016/03/how-to-build-complex-event-processing.html


본 글은 클라우데라 블로그의 글을 정리한 내용이다. 원문은 아래 링크를 따라가면 볼 수 있다.
대략적인 블로그 글의 내용은 CDH를 business execution engine과 결합하여 CEP(Complex Event Processing)로서 사용할 수 있다는 내용으로 이를 무엇을 사용해서 어떻게 구현했다는 내용이다.


  1. CEP에 대해서
    1. Event Processing이란?
      1. 데이터 스트림을 추적 및 분석하여 좀 더 나은 insight와 결정을 할 수 있도록 해주는 것을 의미
    2. CEP란?
      1. Event Processing의 일종으로 여러 소스로부터 얻어낸 데이터를 결합하여 여러 이벤트들간의 패턴 및 복잡한 관계를 찾아내는 것을 의미
      2. CEP는 여러 데이터 소스들로부터 기회와 위협을 확인하고 이를 실시간으로 경고를 할 수 있도록 해준다.
      3. 오늘날 CEP는 매우 다양한 분야에서 사용되고 있다.
        1. 금융: 거래 분석, 금융사기 탐지
        2. 항공: 운항 모니터링
        3. 의료: 클레임 프로세싱, 환자 모니터링
        4. 에너와 통신: 정전 탐지
    3. 빅데이터
      1. 기하급수적인 데이터의 증가로 빅데이터 처리 필요함.
      2. CDH를 사용하여 이를 해결할 수 있다.
  2. Architecture and Design
    1. CEP를 만들기 위해 CDH를 사용한 architecture는 다음과 같다.
    2. 위 그림과 같은 architecture에서 사용된 컴포넌트들은 다음과 같다.
      1. Ingest: 이벤트 수집을 위해  Apache Kafka or Apache Flume 사용.
      2. Storage: 수집한 이벤트 저장에는 Apache HBase(또는 미래에는 아마 Kudu)를 사용.
      3. Alerting: Apache Kafka 또는 다른 직접적인 API를 통합하여 경고를 할 수 있게 함.
      4. Stream processing
        1. spark streaming을 사용하여 event processing을 처리함. 
        2. Processing은 마이크로 배치로 처리되며 다음과 같은 일을 한다.
          1. Parsing
          2. Lookup
          3. Persistence
          4. Building of current state from a series of historical events
          5. Custom processing logic
        3. 일례로 다양한 sliding-window위의 여러 Spark RDD stream을 join하여 실시간에 가깝게 insight와 trend를 얻을 수 있다.
        4. 이 배치 작업은 매번 수초 간격으로 진행되며 수초보다 적은 end-to-end latency가 나게 한다.
      5. Business process management
        1. Rules framework는 technical 또는 non-technical한 사용자들도 복잡한 business logic을 디자인할 수 있게 해준다.
        2. 이 글에서는 Apahce spark와 Drools를 함께 사용하여 business의 요구사항에 대해 평가해본다.
      6. Metrics: OpenTSDB와 같은 time-series 데이터베이스의 대쉬보드를 통해 metrics를 제공한다. 또한 Cloudera Search + HUE를 사용해 같은 기능을 사용할 수 있다.
    3. 예제로 sepsis-shock criteria(패혈증 쇼크 기준)을 Drools를 사용하여 동작시켜보자. 해당 조건들은 이곳에 언급된 내용들을 다룬다.

      1. 위 그림에서 볼 수 있듯이 24시간내에 환자의 두 개의 vital이 범위를 넘어갈 경우 SIRS 기준에 부합하게 된다. 
      2. 이 vital들은 다른 시각에 기준에 도달하게 된다. 따라서 매시간 환자의 상태를 재구성하기 위해 HBase를 사용하여 매시간 vital을 읽고 rules를 적용할 수 있게 한다.
      3. 예제를 위해 모든 event마다 모든 환자들의 vital을 가져온다고 가정하자. 그리고 snapshot/state-building step은 스킵한다.
      4. 환자가 SIRS 기준에 부합하게 되면 즉시 sepsis, severe sepsis, septic shock등을 순서대로 체크해야한다. 이 평가 프로세스의 flow chart는 다음과 같다.
      5. 모든 조건들을 찾아내고 업무자들에게 친숙하게 데모를 만들기 위해 Drools decision tables를 사용한다. 이러한 접근은 business logic을 Java/Scala code 또는 custom syntax로 가지는 것보다 좀 더 많은 관중(business analyst 포함)들에게 가시성을 제공한다.
      6. 다음은 sepsis calculator를 충족시키기 위해 만들어진 decision table이다.
      7. 위 그림에 대하여
        1. 연한 빨강색으로 채워진 모든 셀들은 code로 돌아가도록 링크되어 있다.
        2. 오렌지색으로 채워진 모든 셀들은 rule이 성공적으로 평가된 이후에 정해진 값을 가진다.
        3. 녹색으로 채워진 모든 셀들은 주어진 rule을 만족시키기 위해 들어오는 데이터들의 범위들과 값들을 가진다.
        4. 모든 파란색으로 채워진 모든 셀들은 rule들의 이름과 그 조건들이다.
      8. 다음은 Spark와 Drools의 통합의 몇 가지 목표들이다.
        1. Rule들의 실행을 Spark/Streaming으로부터 seamless하게 만든다.
        2. 단순성을 위해 rule 엔진의 stateless 부분을 사용하라. 일정한 시간 간격을 유지하며 상태를 저장하는 spark의 sliding window를 사용할 수 있다.
        3. Rule을 요구사항에 기반하여 순차적 또는 무작위로 실행하라.
        4. 몇몇 metrics를 계산하기 위해 rule 실행 결과를 spark dataframe에 넣어라.
  3. Coding
    1. 지금부터 위 목표를 달성하기 위한 절차와 코드 스니펫을 보여주겠다. 모든 코드는 https://github.com/mganta/sprue 에서 다운로드 받을 수 있다.
      1. 각 파티션에 session factory를 한번만 초기화한다. 그리고 모든 다음 dstream 실행에서 재사용한다.

        KieContainer kContainer = kieServices.newKieContainer(kieRepository.getDefaultReleaseId());
        kContainer.newStatelessKieSession();


      2. 들어오는 데이터를 HBase에 저장한다.
        //store incoming data in hbase
        hbaseContext.streamBulkPut[Patient](patientStream, patientTable,  HBaseUtil.insertIncomingDataIntoHBase, true)
      3. RDD의 각 이벤트마다 모든 rule들을 실행하고 평가 결과를 RDD와 함께 리턴한다.
        //evaluate all the rules for the batch
            patientStream.foreachRDD(rdd => {
             val evaluatedPatients = rdd.mapPartitions(partitionOfRecords => {
                val ksession = KieSessionFactory.getKieSession(xlsFileName)
                val patients = partitionOfRecords.map(patient => {
                  ksession.execute(patient)
                  patient
                })
                patients
              })
      4. RDD를 dataframe으로 변환하고 몇몇 metrics을 계산한다.
        //convert to dataframe
                val patientdf = sqc.applySchema(evaluatedPatients, classOf[Patient])
                //compute statistics
                val countMatrix = patientdf.groupBy("location").agg(max("evaluationDate"), sum("sirsFlag"), sum("sepsisFlag"), sum("severeSepsisFlag"), sum("septicShockFlag"), sum("organDysfunctionSyndrome"))
               countMatrix.show()
      5. HBase에 업데이트를 저장한다.
        hbaseContext.streamBulkPut[Patient](patientStream, patientTable,  HBaseUtil.insertEvaluatedDataIntoHBase, true
      6. Time-series rest api를 호출하고 마이크로 배치 metrics를 게시한다. Time-series 대쉬보드는 이 데이터를 읽을 수 있다.(여기서  OpenTSDB를 어떻게 설치하는 지를 배울 수 있다.)
        //opentsdb update statistics
        countMatrix.foreach(row => {
        TSDBUpdater.loadPatientStats(row.getString(0), row.getLong(1), row.getLong(2), row.getLong(3), row.getLong(4), row.getLong(5), row.getLong(6))
          })
      7. 위의 모든 절차들은 여기 spark driver code와 링크되어 있다.
      8. 이 예제는 무작위로 선출된 환자의 데이터 스트림 생성하기 위해 QueueStream을 사용한다. 실제 시나리오에서는 각 이벤트에서 hl7 메시지를 받게 된다.
      9. 예제를 실행하면 들어오는 각 이벤트에서 rule이 실해되는 걸 볼 수 있다. 그리고 각 상태의 그룹화된 metrics을 볼 수 있다. 이는 아래와 같다.
        Total Patients in batch: 212
        Patients with atleast one condition: 137
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |location|MAX(evaluationDate)|SUM(sirsFlag)|SUM(sepsisFlag)|SUM(severeSepsisFlag)|SUM(septicShockFlag)|SUM(organDysfunctionSyndrome)|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |      MS|      1443198199233|            3|              2|                    2|                   0|                            0|
        |      NE|      1443198199233|            8|              4|                    4|                   1|                            0|
        |      TX|      1443198199233|           10|              8|                    8|                   7|                            1|
        |      NM|      1443198199232|            8|              6|                    6|                   3|                            0|
        |      NY|      1443198199233|            6|              4|                    3|                   3|                            0|
        |      OK|      1443198199233|            7|              5|                    3|                   1|                            0|
        |      VA|      1443198199232|           16|             14|                   12|                   7|                            1|
        |      IL|      1443198199233|            5|              3|                    3|                   1|                            0|
        |      CA|      1443198199233|            7|              6|                    6|                   4|                            0|
        |      KS|      1443198199233|           12|              8|                    8|                   6|                            0|
        |      LA|      1443198199233|           14|              8|                    7|                   2|                            1|
        |      SC|      1443198199233|           12|              9|                    6|                   4|                            0|
        |      FL|      1443198199233|            7|              4|                    4|                   2|                            0|
        |      MN|      1443198199233|            8|              5|                    5|                   2|                            0|
        |      GA|      1443198199233|           14|             12|                   12|                   6|                            0|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        Total Patients in batch: 247
        Patients with atleast one condition: 170
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |location|MAX(evaluationDate)|SUM(sirsFlag)|SUM(sepsisFlag)|SUM(severeSepsisFlag)|SUM(septicShockFlag)|SUM(organDysfunctionSyndrome)|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |      MS|      1443198199242|            1|              1|                    1|                   1|                            0|
        |      NE|      1443198199241|           14|             11|                   11|                   5|                            1|
        |      TX|      1443198199242|           13|             10|                    9|                   6|                            0|
        |      NM|      1443198199241|           11|              4|                    3|                   2|                            0|
        |      NY|      1443198199241|           13|              7|                    6|                   4|                            0|
        |      OK|      1443198199241|            7|              5|                    5|                   3|                            0|
        |      VA|      1443198199242|           10|              6|                    6|                   2|                            0|
        |      IL|      1443198199241|           17|             12|                   12|                   5|                            1|
        |      CA|      1443198199241|           17|              8|                    7|                   2|                            0|
        |      KS|      1443198199242|           12|              8|                    7|                   5|                            2|
        |      LA|      1443198199242|           15|             11|                    9|                   4|                            2|
        |      SC|      1443198199241|           13|             10|                    9|                   5|                            0|
        |      FL|      1443198199241|           10|              5|                    5|                   1|                            0|
        |      MN|      1443198199241|           10|              6|                    6|                   3|                            0|
        |      GA|      1443198199241|            7|              2|                    1|                   1|                            0|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
      10.  또는 OpenTSDB를 set up하면 다음과 같은 그림을 볼 수 있다.
  4. 결론
    1. 복잡한 시스템을 설계할 때에는 rule 엔진을 사용하는게 유익한 선택이다.(Logic과 데이터의 분리는 domain expert에게 logic 결정에 대한 insight를 주면서 유연한 시스템을 낳는다.)
    2. 지금까지 본 바와 같이 CDH(for Spark, HBase, Kafka)를 rule엔진과 결합하여 사용하는게 복잡한 business logic을 평가하고 실시간으로 이를 동작시키는데 도움이 되는 걸 알 수 있다.
번호 제목 글쓴이 날짜 조회 수
59 fuseki webUI를 통해서 전체 카운트를 하면 급격하게 메모리를 소모해 버리는 문제가 있음 file 총관리자 2017.04.28 162
58 update를 많이 하면 heap memory가 많이 소진되고 최종적으로 OOM가 발생하는데 이에 대한 설명 총관리자 2017.04.10 717
57 LUBM 개수별 hadoop HDFS data사이즈 정리 총관리자 2017.04.06 142
56 protege 설명및 사용법 file 총관리자 2017.04.04 2794
55 fuseki가 제공하는 web ui를 통해서 dataset를 remove->create할 경우 동일한 동일한 이름으로 지정했을때 fuseki-server.jar가 뜨지 않는 현상 총관리자 2017.02.03 54
54 https://github.com/Merck/Halyard프로젝트 컴파일및 배포/테스트 총관리자 2017.01.24 103
53 federated query 예제 총관리자 2017.01.19 31
52 Halyard - RDF4J와 Apache HBase를 이용하여 구현된 TripleStore이며 SPARQL 1.1쿼리를 지원한다. 총관리자 2016.12.29 630
51 jena의 data폴더를 hadoop nfs를 이용하여 HDFS상의 폴더에 마운트 시키고 fuseki를 통하여 inert를 시도했을때 transaction 오류 발생 총관리자 2016.12.02 82
50 Jena 2.3를 Hadoop 2.7.2의 NFS로 mount하고 fuseki를 이용하여 start할때 오류 메세지 총관리자 2016.12.02 1557
49 S2RDF를 이용한 다른 버젼의 github링크 총관리자 2016.12.02 51
48 데이타 분석및 머신러닝에 도움이 도움이 되는 사이트 총관리자 2016.11.04 65
» How-to: Build a Complex Event Processing App on Apache Spark and Drools file 총관리자 2016.10.31 253
46 동시에 많은 요청이 endpoint로 몰려서java.net.NoRouteToHostException가 발생하는 경우의 처리방법 총관리자 2016.10.17 513
45 프로그래밍 언어별 딥러닝 라이브러리 정리 file 총관리자 2016.10.05 215
44 모두를 위한 머신러닝과 딥러닝의 강의 file 총관리자 2016.09.27 192
43 특정문자열이나 URI를 임의로 select 절에 지정하여 사용할때 사용하는 sparql 문장 총관리자 2016.08.25 376
42 jena jar파일실행시 org.apache.jena.tdb.TDB.init에서 java.lang.NullPointerException발생시 조치사항 총관리자 2016.08.19 134
41 Jena는 기본적으로 multi thread환경을 지원하지 않는다. 총관리자 2016.08.16 94
40 [Elephas] Jena Elephas를 이용하여 Spark에서 rdfTriples의 RDD를 만들고 RDD관련 작업하는 샘플소스 총관리자 2016.08.10 90

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로