Kinesis Analytics를 활용하여 스트리밍 데이터를 처리할 application을 생성하고 처리된 결과를 S3에 쌓을 수 있도록 Firehose에 연결합니다.
Kinesis Analytics를 활용하여 스트리밍 데이터를 처리할 application을 생성합니다.
AWS Management Console에서 Kinesis 서비스에 접속합니다.
Kinesis analytics applications 의 Create analytics application 버튼을 클릭합니다.

Application name에 StreamApplication을 입력하고, Description에는 Streaming data processing application를 입력합니다. Runtime은 SQL 을 선택하고, 우측 하단의 Create application 버튼을 클릭합니다.

성공적으로 Kinesis Analytics Application 이 만들어 졌습니다.
Kinesis Analytics Application은 크게 3가지 영역으로 구성되어 있습니다.
- 데이터 유입을 위한 Source 영역
- Real time analytics 작업을 위한 영역
- 분석 결과를 받아줄 Destination 영역
데이터를 수집하기 위한 source를 지정합니다.
위 생성된 화면에서 Connect streaming data 버튼을 클릭합니다.
Choose source를 선택하고, Source의 서비스는 Kinesis Firehose delivery stream 을 선택하고 앞서 생성한 2개의 스트림 중 Source 를 선택합니다.

그리고 하단의 Discover schema 버튼을 클릭하여 Schema를 분석합니다.

Schema 분석이 끝나면 아래에 샘플 데이터를 보여줍니다. 데이터 타입을 바르게 수정하기 위해 Edit schema 버튼을 클릭합니다.
이 때 Source data가 없다는 문구가 표시된다면 EC2에서 firehose.py가 실행 중인지 확인 합니다.
OccurrenceStartDate 와 DiscoveryDate 의 타입을 DATE 로 변경합니다.
NetLoss, RecoveryAmount, EstimatedGrossLoss 는 DOUBLE 로 변경합니다.
우측 하단의 Save schema and update stream samples 버튼을 클릭하여 저장합니다.

저장이 진행되는 것을 확인할 수 있습니다.

저장이 완료되면 Exit(done) 텍스트를 클릭하고, Source 지정을 완료합니다.

Source 지정이 성공적으로 완료된 것을 확인합니다.

실시간 데이터 분석을 위해서 SQL을 생성합니다.
Go to SQL Editor 버튼을 클릭합니다.

Text editor에 다음과 같은 SQL문을 입력한 뒤 Save and run SQL 버튼을 클릭합니다.

CREATE OR REPLACE STREAM "DESTINATION_SQL_BASIC_STREAM"
(Region VARCHAR(16), Business VARCHAR(32), Name VARCHAR(16), Status VARCHAR(16), RiskCategory VARCHAR(64),
RiskSubCategory VARCHAR(64), DiscoveryDate DATE, OccurrenceStartDate DATE, NetLoss DOUBLE, RecoveryAmount DOUBLE,
EstimatedGrossLoss DOUBLE);
CREATE OR REPLACE PUMP "STREAM_PUMP_1" AS INSERT INTO "DESTINATION_SQL_BASIC_STREAM"
SELECT STREAM "Region", "Business", "Name", "Status", "RiskCategory", "RiskSubCategory", "DiscoveryDate",
"OccurrenceStartDate", "NetLoss", "RecoveryAmount", "EstimatedGrossLoss" FROM "SOURCE_SQL_STREAM_001";
이 때 Source data가 없다는 문구가 표시된다면 EC2에서 firehose.py가 실행 중인지 확인 후 Refresh 합니다.
SQL 쿼리가 수행을 준비하는 것을 확인할 수 있습니다.

SQL 쿼리에 의해서 실시간으로 결과가 나오는 것을 확인 할 수 있습니다. 확인 후, 우측 하단의 Close 버튼을 클릭합니다.

Kinesis Analytics의 SQL 수행 결과를 S3에 저장할 수 있습니다.
Kinesis Analytics StreamApplication 화면에서 하단의 Destination에서 Connect to a destination 버튼을 클릭합니다.

Destination 정보를 지정한 후 Save and Continue 버튼을 클릭합니다.

| 구분 | 값 |
|---|---|
| Destination | Kinesis Firehose delivery stream |
| Kinesis Firehose delivery stream | Destination |
| In-application stream name | DESTINATION_SQL_BASIC_STREAM |
| Output format | JSON |
| Access to chosen resources | Create / update IAM role |
Destination이 설정 되었습니다. Exit to Kinesis Analytics applications 텍스트를 클릭합니다.

만약 EC2에서 firehose.py 실행을 중단했다면 다시 실행합니다.
몇 분 뒤 S3 bucket을 보면 설정한 Prefix인 destination 폴더가 생성된 것을 확인할 수 있습니다.

해당 폴더에는 설정한대로 JSON 포맷으로 데이터가 수집됩니다.
