분석을 위한 Analytics 생성

실습 소개

Kinesis Analytics를 활용하여 스트리밍 데이터를 처리할 application을 생성하고 처리된 결과를 S3에 쌓을 수 있도록 Firehose에 연결합니다.

실습 순서

Kinesis Analytics Application - 생성

Kinesis Analytics를 활용하여 스트리밍 데이터를 처리할 application을 생성합니다.

  1. AWS Management Console에서 Kinesis 서비스에 접속합니다.

  2. Kinesis analytics applicationsCreate analytics application 버튼을 클릭합니다. create-analytics-01

  3. Application name에 StreamApplication을 입력하고, Description에는 Streaming data processing application를 입력합니다. Runtime은 SQL 을 선택하고, 우측 하단의 Create application 버튼을 클릭합니다. create-analytics-02

  4. 성공적으로 Kinesis Analytics Application 이 만들어 졌습니다.
    create-analytics-03

    Kinesis Analytics Application은 크게 3가지 영역으로 구성되어 있습니다. 

  • 데이터 유입을 위한 Source 영역
  • Real time analytics 작업을 위한 영역
  • 분석 결과를 받아줄 Destination 영역

Kinesis Analytics Application - Source 지정

데이터를 수집하기 위한 source를 지정합니다.

  1. 위 생성된 화면에서 Connect streaming data 버튼을 클릭합니다.

  2. Choose source를 선택하고, Source의 서비스는 Kinesis Firehose delivery stream 을 선택하고 앞서 생성한 2개의 스트림 중 Source 를 선택합니다. create-analytics-04

  3. 그리고 하단의 Discover schema 버튼을 클릭하여 Schema를 분석합니다. create-analytics-05

  4. Schema 분석이 끝나면 아래에 샘플 데이터를 보여줍니다. 데이터 타입을 바르게 수정하기 위해 Edit schema 버튼을 클릭합니다. create-analytics-06

    이 때 Source data가 없다는 문구가 표시된다면 EC2에서 firehose.py가 실행 중인지 확인 합니다.

  5. OccurrenceStartDateDiscoveryDate 의 타입을 DATE 로 변경합니다.
    NetLoss, RecoveryAmount, EstimatedGrossLossDOUBLE 로 변경합니다.
    우측 하단의 Save schema and update stream samples 버튼을 클릭하여 저장합니다. create-analytics-07

  6. 저장이 진행되는 것을 확인할 수 있습니다. create-analytics-08

  7. 저장이 완료되면 Exit(done) 텍스트를 클릭하고, Source 지정을 완료합니다. create-analytics-09

  8. Source 지정이 성공적으로 완료된 것을 확인합니다. create-analytics-10


Kinesis Analytics Application - SQL 실시간 쿼리

실시간 데이터 분석을 위해서 SQL을 생성합니다.

  1. Go to SQL Editor 버튼을 클릭합니다. create-analytics-11

  2. Text editor에 다음과 같은 SQL문을 입력한 뒤 Save and run SQL 버튼을 클릭합니다. create-analytics-12

CREATE OR REPLACE STREAM "DESTINATION_SQL_BASIC_STREAM"
    (Region VARCHAR(16), Business VARCHAR(32), Name VARCHAR(16), Status VARCHAR(16), RiskCategory VARCHAR(64),
    RiskSubCategory VARCHAR(64), DiscoveryDate DATE, OccurrenceStartDate DATE, NetLoss DOUBLE, RecoveryAmount DOUBLE,
    EstimatedGrossLoss DOUBLE);
    
CREATE OR REPLACE PUMP "STREAM_PUMP_1" AS INSERT INTO "DESTINATION_SQL_BASIC_STREAM"

SELECT STREAM "Region", "Business", "Name", "Status", "RiskCategory", "RiskSubCategory", "DiscoveryDate",
    "OccurrenceStartDate", "NetLoss", "RecoveryAmount", "EstimatedGrossLoss" FROM "SOURCE_SQL_STREAM_001";

이 때 Source data가 없다는 문구가 표시된다면 EC2에서 firehose.py가 실행 중인지 확인 후 Refresh 합니다.

  1. SQL 쿼리가 수행을 준비하는 것을 확인할 수 있습니다. create-analytics-13

  2. SQL 쿼리에 의해서 실시간으로 결과가 나오는 것을 확인 할 수 있습니다. 확인 후, 우측 하단의 Close 버튼을 클릭합니다. create-analytics-14


Kinesis Analytics Application - Destination 지정

Kinesis Analytics의 SQL 수행 결과를 S3에 저장할 수 있습니다.

  1. Kinesis Analytics StreamApplication 화면에서 하단의 Destination에서 Connect to a destination 버튼을 클릭합니다. create-analytics-15

  2. Destination 정보를 지정한 후 Save and Continue 버튼을 클릭합니다. create-analytics-16

구분
Destination Kinesis Firehose delivery stream
Kinesis Firehose delivery stream Destination
In-application stream name DESTINATION_SQL_BASIC_STREAM
Output format JSON
Access to chosen resources Create / update IAM role
  1. Destination이 설정 되었습니다. Exit to Kinesis Analytics applications 텍스트를 클릭합니다. create-analytics-17

  2. 만약 EC2에서 firehose.py 실행을 중단했다면 다시 실행합니다.

  3. 몇 분 뒤 S3 bucket을 보면 설정한 Prefix인 destination 폴더가 생성된 것을 확인할 수 있습니다. create-analytics-18

  4. 해당 폴더에는 설정한대로 JSON 포맷으로 데이터가 수집됩니다. create-analytics-19