메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


* 출처 : http://kysepark.blogspot.kr/2016/03/how-to-build-complex-event-processing.html


본 글은 클라우데라 블로그의 글을 정리한 내용이다. 원문은 아래 링크를 따라가면 볼 수 있다.
대략적인 블로그 글의 내용은 CDH를 business execution engine과 결합하여 CEP(Complex Event Processing)로서 사용할 수 있다는 내용으로 이를 무엇을 사용해서 어떻게 구현했다는 내용이다.


  1. CEP에 대해서
    1. Event Processing이란?
      1. 데이터 스트림을 추적 및 분석하여 좀 더 나은 insight와 결정을 할 수 있도록 해주는 것을 의미
    2. CEP란?
      1. Event Processing의 일종으로 여러 소스로부터 얻어낸 데이터를 결합하여 여러 이벤트들간의 패턴 및 복잡한 관계를 찾아내는 것을 의미
      2. CEP는 여러 데이터 소스들로부터 기회와 위협을 확인하고 이를 실시간으로 경고를 할 수 있도록 해준다.
      3. 오늘날 CEP는 매우 다양한 분야에서 사용되고 있다.
        1. 금융: 거래 분석, 금융사기 탐지
        2. 항공: 운항 모니터링
        3. 의료: 클레임 프로세싱, 환자 모니터링
        4. 에너와 통신: 정전 탐지
    3. 빅데이터
      1. 기하급수적인 데이터의 증가로 빅데이터 처리 필요함.
      2. CDH를 사용하여 이를 해결할 수 있다.
  2. Architecture and Design
    1. CEP를 만들기 위해 CDH를 사용한 architecture는 다음과 같다.
    2. 위 그림과 같은 architecture에서 사용된 컴포넌트들은 다음과 같다.
      1. Ingest: 이벤트 수집을 위해  Apache Kafka or Apache Flume 사용.
      2. Storage: 수집한 이벤트 저장에는 Apache HBase(또는 미래에는 아마 Kudu)를 사용.
      3. Alerting: Apache Kafka 또는 다른 직접적인 API를 통합하여 경고를 할 수 있게 함.
      4. Stream processing
        1. spark streaming을 사용하여 event processing을 처리함. 
        2. Processing은 마이크로 배치로 처리되며 다음과 같은 일을 한다.
          1. Parsing
          2. Lookup
          3. Persistence
          4. Building of current state from a series of historical events
          5. Custom processing logic
        3. 일례로 다양한 sliding-window위의 여러 Spark RDD stream을 join하여 실시간에 가깝게 insight와 trend를 얻을 수 있다.
        4. 이 배치 작업은 매번 수초 간격으로 진행되며 수초보다 적은 end-to-end latency가 나게 한다.
      5. Business process management
        1. Rules framework는 technical 또는 non-technical한 사용자들도 복잡한 business logic을 디자인할 수 있게 해준다.
        2. 이 글에서는 Apahce spark와 Drools를 함께 사용하여 business의 요구사항에 대해 평가해본다.
      6. Metrics: OpenTSDB와 같은 time-series 데이터베이스의 대쉬보드를 통해 metrics를 제공한다. 또한 Cloudera Search + HUE를 사용해 같은 기능을 사용할 수 있다.
    3. 예제로 sepsis-shock criteria(패혈증 쇼크 기준)을 Drools를 사용하여 동작시켜보자. 해당 조건들은 이곳에 언급된 내용들을 다룬다.

      1. 위 그림에서 볼 수 있듯이 24시간내에 환자의 두 개의 vital이 범위를 넘어갈 경우 SIRS 기준에 부합하게 된다. 
      2. 이 vital들은 다른 시각에 기준에 도달하게 된다. 따라서 매시간 환자의 상태를 재구성하기 위해 HBase를 사용하여 매시간 vital을 읽고 rules를 적용할 수 있게 한다.
      3. 예제를 위해 모든 event마다 모든 환자들의 vital을 가져온다고 가정하자. 그리고 snapshot/state-building step은 스킵한다.
      4. 환자가 SIRS 기준에 부합하게 되면 즉시 sepsis, severe sepsis, septic shock등을 순서대로 체크해야한다. 이 평가 프로세스의 flow chart는 다음과 같다.
      5. 모든 조건들을 찾아내고 업무자들에게 친숙하게 데모를 만들기 위해 Drools decision tables를 사용한다. 이러한 접근은 business logic을 Java/Scala code 또는 custom syntax로 가지는 것보다 좀 더 많은 관중(business analyst 포함)들에게 가시성을 제공한다.
      6. 다음은 sepsis calculator를 충족시키기 위해 만들어진 decision table이다.
      7. 위 그림에 대하여
        1. 연한 빨강색으로 채워진 모든 셀들은 code로 돌아가도록 링크되어 있다.
        2. 오렌지색으로 채워진 모든 셀들은 rule이 성공적으로 평가된 이후에 정해진 값을 가진다.
        3. 녹색으로 채워진 모든 셀들은 주어진 rule을 만족시키기 위해 들어오는 데이터들의 범위들과 값들을 가진다.
        4. 모든 파란색으로 채워진 모든 셀들은 rule들의 이름과 그 조건들이다.
      8. 다음은 Spark와 Drools의 통합의 몇 가지 목표들이다.
        1. Rule들의 실행을 Spark/Streaming으로부터 seamless하게 만든다.
        2. 단순성을 위해 rule 엔진의 stateless 부분을 사용하라. 일정한 시간 간격을 유지하며 상태를 저장하는 spark의 sliding window를 사용할 수 있다.
        3. Rule을 요구사항에 기반하여 순차적 또는 무작위로 실행하라.
        4. 몇몇 metrics를 계산하기 위해 rule 실행 결과를 spark dataframe에 넣어라.
  3. Coding
    1. 지금부터 위 목표를 달성하기 위한 절차와 코드 스니펫을 보여주겠다. 모든 코드는 https://github.com/mganta/sprue 에서 다운로드 받을 수 있다.
      1. 각 파티션에 session factory를 한번만 초기화한다. 그리고 모든 다음 dstream 실행에서 재사용한다.

        KieContainer kContainer = kieServices.newKieContainer(kieRepository.getDefaultReleaseId());
        kContainer.newStatelessKieSession();


      2. 들어오는 데이터를 HBase에 저장한다.
        //store incoming data in hbase
        hbaseContext.streamBulkPut[Patient](patientStream, patientTable,  HBaseUtil.insertIncomingDataIntoHBase, true)
      3. RDD의 각 이벤트마다 모든 rule들을 실행하고 평가 결과를 RDD와 함께 리턴한다.
        //evaluate all the rules for the batch
            patientStream.foreachRDD(rdd => {
             val evaluatedPatients = rdd.mapPartitions(partitionOfRecords => {
                val ksession = KieSessionFactory.getKieSession(xlsFileName)
                val patients = partitionOfRecords.map(patient => {
                  ksession.execute(patient)
                  patient
                })
                patients
              })
      4. RDD를 dataframe으로 변환하고 몇몇 metrics을 계산한다.
        //convert to dataframe
                val patientdf = sqc.applySchema(evaluatedPatients, classOf[Patient])
                //compute statistics
                val countMatrix = patientdf.groupBy("location").agg(max("evaluationDate"), sum("sirsFlag"), sum("sepsisFlag"), sum("severeSepsisFlag"), sum("septicShockFlag"), sum("organDysfunctionSyndrome"))
               countMatrix.show()
      5. HBase에 업데이트를 저장한다.
        hbaseContext.streamBulkPut[Patient](patientStream, patientTable,  HBaseUtil.insertEvaluatedDataIntoHBase, true
      6. Time-series rest api를 호출하고 마이크로 배치 metrics를 게시한다. Time-series 대쉬보드는 이 데이터를 읽을 수 있다.(여기서  OpenTSDB를 어떻게 설치하는 지를 배울 수 있다.)
        //opentsdb update statistics
        countMatrix.foreach(row => {
        TSDBUpdater.loadPatientStats(row.getString(0), row.getLong(1), row.getLong(2), row.getLong(3), row.getLong(4), row.getLong(5), row.getLong(6))
          })
      7. 위의 모든 절차들은 여기 spark driver code와 링크되어 있다.
      8. 이 예제는 무작위로 선출된 환자의 데이터 스트림 생성하기 위해 QueueStream을 사용한다. 실제 시나리오에서는 각 이벤트에서 hl7 메시지를 받게 된다.
      9. 예제를 실행하면 들어오는 각 이벤트에서 rule이 실해되는 걸 볼 수 있다. 그리고 각 상태의 그룹화된 metrics을 볼 수 있다. 이는 아래와 같다.
        Total Patients in batch: 212
        Patients with atleast one condition: 137
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |location|MAX(evaluationDate)|SUM(sirsFlag)|SUM(sepsisFlag)|SUM(severeSepsisFlag)|SUM(septicShockFlag)|SUM(organDysfunctionSyndrome)|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |      MS|      1443198199233|            3|              2|                    2|                   0|                            0|
        |      NE|      1443198199233|            8|              4|                    4|                   1|                            0|
        |      TX|      1443198199233|           10|              8|                    8|                   7|                            1|
        |      NM|      1443198199232|            8|              6|                    6|                   3|                            0|
        |      NY|      1443198199233|            6|              4|                    3|                   3|                            0|
        |      OK|      1443198199233|            7|              5|                    3|                   1|                            0|
        |      VA|      1443198199232|           16|             14|                   12|                   7|                            1|
        |      IL|      1443198199233|            5|              3|                    3|                   1|                            0|
        |      CA|      1443198199233|            7|              6|                    6|                   4|                            0|
        |      KS|      1443198199233|           12|              8|                    8|                   6|                            0|
        |      LA|      1443198199233|           14|              8|                    7|                   2|                            1|
        |      SC|      1443198199233|           12|              9|                    6|                   4|                            0|
        |      FL|      1443198199233|            7|              4|                    4|                   2|                            0|
        |      MN|      1443198199233|            8|              5|                    5|                   2|                            0|
        |      GA|      1443198199233|           14|             12|                   12|                   6|                            0|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        Total Patients in batch: 247
        Patients with atleast one condition: 170
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |location|MAX(evaluationDate)|SUM(sirsFlag)|SUM(sepsisFlag)|SUM(severeSepsisFlag)|SUM(septicShockFlag)|SUM(organDysfunctionSyndrome)|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |      MS|      1443198199242|            1|              1|                    1|                   1|                            0|
        |      NE|      1443198199241|           14|             11|                   11|                   5|                            1|
        |      TX|      1443198199242|           13|             10|                    9|                   6|                            0|
        |      NM|      1443198199241|           11|              4|                    3|                   2|                            0|
        |      NY|      1443198199241|           13|              7|                    6|                   4|                            0|
        |      OK|      1443198199241|            7|              5|                    5|                   3|                            0|
        |      VA|      1443198199242|           10|              6|                    6|                   2|                            0|
        |      IL|      1443198199241|           17|             12|                   12|                   5|                            1|
        |      CA|      1443198199241|           17|              8|                    7|                   2|                            0|
        |      KS|      1443198199242|           12|              8|                    7|                   5|                            2|
        |      LA|      1443198199242|           15|             11|                    9|                   4|                            2|
        |      SC|      1443198199241|           13|             10|                    9|                   5|                            0|
        |      FL|      1443198199241|           10|              5|                    5|                   1|                            0|
        |      MN|      1443198199241|           10|              6|                    6|                   3|                            0|
        |      GA|      1443198199241|            7|              2|                    1|                   1|                            0|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
      10.  또는 OpenTSDB를 set up하면 다음과 같은 그림을 볼 수 있다.
  4. 결론
    1. 복잡한 시스템을 설계할 때에는 rule 엔진을 사용하는게 유익한 선택이다.(Logic과 데이터의 분리는 domain expert에게 logic 결정에 대한 insight를 주면서 유연한 시스템을 낳는다.)
    2. 지금까지 본 바와 같이 CDH(for Spark, HBase, Kafka)를 rule엔진과 결합하여 사용하는게 복잡한 business logic을 평가하고 실시간으로 이를 동작시키는데 도움이 되는 걸 알 수 있다.
번호 제목 글쓴이 날짜 조회 수
59 halyard 1.3을 다른 서버로 이전하는 방법 총관리자 2017.07.05 66
58 전체 컨택스트 내용 file 총관리자 2017.12.19 66
57 실시간 쿼리 변환 모니터링(팩트내 필드값의 변경사항을 실시간으로 추적함)하는 테스트 java 프로그램 file 총관리자 2016.07.21 67
56 슬라이딩 윈도우 예제 총관리자 2016.07.28 67
55 SPIN(SPARQL Inference Notation)이란.. file 총관리자 2016.02.25 73
54 drools에서 drl관련 로그를 기록하기 위한 클래스 파일 총관리자 2016.07.21 74
53 S2RDF 테스트(벤치마크 테스트를 기준으로 python, scala소스가 만들어져서 기능은 파악되지 못함) [2] file 총관리자 2016.05.27 76
52 oneM2M Specification(Draft Release 3, 2, 1), Draft Technical Reports 총관리자 2017.10.25 81
51 jena의 data폴더를 hadoop nfs를 이용하여 HDFS상의 폴더에 마운트 시키고 fuseki를 통하여 inert를 시도했을때 transaction 오류 발생 총관리자 2016.12.02 82
50 [Elephas] Jena Elephas를 이용하여 Spark에서 rdfTriples의 RDD를 만들고 RDD관련 작업하는 샘플소스 총관리자 2016.08.10 90
49 Jena는 기본적으로 multi thread환경을 지원하지 않는다. 총관리자 2016.08.16 94
48 시맨틱 관련 논문 모음 사이트 총관리자 2017.06.13 94
47 RDF storage조합에대한 test결과(4store, Jena+HBase, Hive+HBase, CumulusRDF, Couchbase) 페이지 링크 총관리자 2016.05.26 102
46 fuseki의 endpoint를 이용한 insert, delete하는 sparql예시 총관리자 2018.02.14 102
45 https://github.com/Merck/Halyard프로젝트 컴파일및 배포/테스트 총관리자 2017.01.24 103
44 TopBraid Composer에서 SPIN 사용법 file 총관리자 2016.02.25 104
43 solrcloud에 solrdf1.1설치하고 테스트 하기 총관리자 2016.04.22 113
42 select와 group by결과 값이 없는경우의 리턴 값이 다름 file 총관리자 2016.02.05 119
41 protege 4.3 다운로드 총관리자 2015.12.09 126
40 한번에 여러값 update하기 총관리자 2016.01.13 134

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로