5.4.VPC Flow 기반 모니터링2

Update : 2020-03-06

VPC Flow, S3, Athena, QuickSight 기반 모니터링 구성 소개

이 랩에서는 "5.3 VPC Flow 기반 모니터링1" 와는 다르게 VPC Flow Log를 S3로 전송하고, S3에 쌓여진 데이터를 Athena에서 호출해서 데이터셋을 만들고, QuickSight를 통해 시각화하는 방법을 구성합니다.

또한 CloudWatch에서 특정 지표를 만들어서 VPC Flow log내에서 특정 위협이 발생했을때 SNS를 통해서 이메일로 전송하도록 합니다.

VPC Flow, S3 구성

1. VPC Flow를 수신할 S3를 생성합니다.

먼저 VPCFlow를 수신할 S3 버킷을 생성합니다.

[S3] - [ Buckets] - [Create Bucket]

[S3] - [ Buckets]

Bucket ARN은 뒤에서도 활용되므로 복사해 둡니다.

2. VPC Flow Log를 생성합니다.

VPC Flow Log를 생성합니다.

[VPC] - [Flow Logs] - [Create Flow log]

VPC Flow 목적지를 S3로 전송합니다. 해당 버킷의 위치는 앞서 복사한 ARN값을 입력합니다.

2. 추가로 SNS와 연동하고 Cloudwatch 로그그룹으로 전송할 VPC Flow Log를 구성합니다.

추가로 CloudWatch Alram을 만들고 SNS로 보낼 VPC Flow Log도 [그림 5.4.7]에서 처럼 생성합니다. CloudWatch로 보내는 방법과 Role 생성 등의 상세 방법 "5.3 VPC Flow 기반 모니터링1"을 참조합니다.

3. SNS와 연동할 CloudWatch 로그 그룹에 Metric Filter를 생성합니다.

[Cloudwatch] - [ Logs ] - [Log groups] - [Create Metric Filter]

필터를 추가하기 위해 생성된 Log 스트림 중에 한개를 선택해서 메세지 필드 타입을 상세 확인해 봅니다.

4. SNS와 연동할 CloudWatch 로그 그룹에 Metric Filter를 정합니다.

아래 [그림 5.4.13]에서 처럼 80포트로 접속하는 서버에 필터를 적용합니다. 실제 랩에서는 Bastion SVR (10.0.0.155)가 ITSVR에 접근할 때 Filter가 걸리도록 적용했습니다. Filter는 임의대로 적용 가능합니다.

Filter Pattern은 아래와 같이 입력합니다.

[version, account, eni, source="10.0.0.155", destination, srcport, destport="80", protocol="6", packets, bytes, windowstart, windowend, action="REJECT", flowlogstatus]

5. SNS와 연동할 CloudWatch 로그 그룹에 Metric Filter가 생성된 것을 확인하고, Create Alarm을 생성합니.

6. CloudWatch Alram 1, 2 단계를 생성합니.

[그림 5.4.16]에서 처럼 Cloudwatch Alram을 생성합니다. 트리거를 "Alram"을 선택하고, Topic을 생성합니다. 또한 알람이 발생하고, 해당 내용을 수신할 이메일 주소를 입력합니다. 마지막으로 Topic을 생성합니다.

[Cloudwatch] - [Alarms] - [ Create Alarm]

7. Topic을 생성합니.

Metric의 이름과 Condition을 정의합니다. 아래 예는 이벤트가 300초 동안 3번 이상 발생했을 경우 트리거 됩니다.

8. 수신된 이메일을 확인합니다.

앞서 트리거가 발생된 후 전송할 이메일이 정상적으로 수신되었는지 확인합니다. 해당 메일에서 Subscription을 아래와 같이 Confirm해야 정상 발송이 시작됩니다.

9. SNS에서 수신을 확인합니다.

SNS에서 해당 Subscription이 정상적으로 Confirmed 되었는지 확인합니다.

[SNS] - [Topic] - [ ITSVR-HTTP-ACCESS]

10. CloudWatch Alram 3, 4 단계를 생성합니.

6번에서 진행했던 나머지 3,4, 단계를 마무리합니다. Alram 이름을 정의합니다.

[Cloudwatch] -[ Alarms] - [Create Alarm]

11. CloudWatch Logs 에서 간단하게 insight 분석할 수 있습니다. (Options)

12. Event를 수신하고 정상적으로 SNS를 통해 이메일이 수신되는지 확인합니다.

Bastion Server 에서 ITSVR-A로 HTTP 접속을 시도합니다.

Reject에 대한 Event를 SNS를 통해 이메일 발송하게 되므로, Security Group에서 Reject가 되도록 사전에 Rule이 정의 되어 있어야 하므로 적절하게 구성해야 합니다. 아래 Filter 예를 몇가지 표기합니다.

[version, account, eni, source="10.0.0.155", destination, srcport, destport="80", protocol="6", packets, bytes, windowstart, windowend, action="REJECT", flowlogstatus]
# Source 10.0.0.155에서 http로 접속하고 REJECT

[version, account, eni, source, destination="10.0.14.100", srcport, destport="80", protocol="6", packets, bytes, windowstart, windowend, action="REJECT", flowlogstatus]
# Destination 10.0.14.100으로 http로 접속하고 REJECT

[version, account, eni, source, destination, srcport, destport="22", protocol="6", packets, bytes, windowstart, windowend, action="REJECT", flowlogstatus]
# Destination ssh로 접속하고 REJECT

[version, account, eni, source, destination, srcport, destport="80", protocol="6", packets, bytes, windowstart, windowend, action="REJECT", flowlogstatus]
# Destination http로 접속하고 REJECT

예제에서는 HTTP로 접속하고 , 3차례 접속을 시도하여 미리 만든 룰을 위배시킵니다.

curl http://ITSVR-A

curl http://ITSVR-A

curl http://ITSVR-A

300초간 3차례 HTTP 접속을 시도하는 것에 대해 룰을 만들었으므로, 아래와 같이 이메일이 통지되었습니다.

13. Cloudwatch Alram을 확인합니.

Cloudwatch에서도 정상적으로 Alarm이 수신되는지 확인합니다.

[Cloudwatch] - [Alrams] - [Alarm]

VPC Flow를 분석하여 특정 패턴에 대해 로그가 발생할 경우 위의 예제에서 처럼 SNS와 연동을 통해 메일을 보내거나, Lambda를 SNS와 연동하여 Slack으로 전송할 수 있습니다.

Athena 구성

앞서 VPC Flow log의 전송 목적지를 S3로 보내는 것을 생성하였습니다.

이제 S3의 데이터를 Athena에서 Data Set으로 생성하고, Query를 통해 확인해 보겠습니다.

1.S3 에 수집된 VPC Flow데이터를 확인합니다.

아래 그림과 같이 S3 Bucket을 선택하여 폴더 구조로 확인을 해 보면 정상적으로 VPC Flow가 수신된 것을 확인 할 수 있습니다.이 데이터를 가지고 Athena에서 DataSet을 생성합니다.

[S3] - [Bucket 선택]

2. Athena 생성합니다.

Athena를 생성합니다.

3.먼저 DataSource에 대한 DLD를 생성합니다.

Datasource와 Database는 기본을 선택합니다. 먼저 DLD를 위해 Query를 생성합니다. Source Code는 아래를 참조합니다.

[Athena] - [Query Editor]

CREATE EXTERNAL TABLE IF NOT EXISTS default.vpc_flow_logs (
  version int,
  account string,
  interfaceid string,
  sourceaddress string,
  destinationaddress string,
  sourceport int,
  destinationport int,
  protocol int,
  numpackets int,
  numbytes bigint,
  starttime int,
  endtime int,
  action string,
  logstatus string
)  
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
LOCATION 's3://{your_log_bucket}/AWSLogs/{account_id}/vpcflowlogs/us-east-1/'
TBLPROPERTIES ("skip.header.line.count"="1");

4.먼저 DataSource에 대한 파티션을 추가 생성성합니다.

파티션에 대한 날짜 데이터를 추가하기 위해 아래에서 처럼 추가해 줍니다.

ALTER TABLE default.vpc_flow_logs
ADD PARTITION (dt='{Year}-{Month}-{Day}')
location 's3://{your_log_bucket}/AWSLogs/{account_id}/vpcflowlogs/us-east-1/{Year}/{Month}/{Day}';

5. Query를 통해 정상적으로 결과가 호출되는지 확인합니다.

Select문은 아래를 참조합니다.

SELECT day_of_week(from_iso8601_timestamp(dt)) AS
  day,
  dt,
  interfaceid,
  sourceaddress,
  destinationport,
  action,
  protocol
FROM vpc_flow_logs
WHERE action = 'REJECT' AND protocol = 6
order by sourceaddress
LIMIT 100;

이제 Athena에서 만들어진 데이터셋을 통해 QuickSight에서 어떻게 표현 되는지 확인합니다.

QuickSight 구성

1.먼저 Quicksight Account를 임의로 생성합니다.

Athena를 Source로 사용하기 때문에 선택해 줍니다.

2.DataSet을 Athena를 선택해 줍니다.

3. US-EAST-1에 생성한 Athena Table을 호출합니다.

4. 추출된 DataSet의 Fields list의 주요 Key값을 통해 QuickSight에서 시각화하여 분석해 봅니다.

아래 예는 출발지와 목적지 주소, Security Group에서 Accept/Reject 현황, ENI에서 발생되는 카운트, Byte 총 수량, Packet 총 수량등을 분석하여 시각화 한 예입니다.

지금까지 VPC Flow Log를 CloudWatch, S3등으로 전송하여 ELK, QuickSight등으로 분석하고, 주요 보안 룰에 위배되는 경우 SNS와 연동하여 이메일이나 Slack과 같은 협업도구에 전송하는 방법을 알아보았습니다.

Last updated