tl;dr
前回の続き。
VPC Flow Logs を CloudWatch Logs Subscriptions と Kinesis Client Library for Ruby アプリケーションを利用してトラフィックログから Action が REJECT なログを Datadog Evnet に通知して可視化してみる。
可視化してみるとゾッとするくらいいろんなところからアクセスしていることがリアルタイムで判ったりして少し複雑な心境。
教材
参考
Terraform Template
以下のテンプレート一発で CloudWatch Logs Subscriptions に必要な Kinesis のストリームと IAM ロールが作成される。
Kinesis のストリーム名が決め打ちになっているのは後からでも直そう。
俺の Kinesis Client Library for Ruby アプリケーション
github.com
上記のサンプルを流用して以下を作成。
また、雑な Dockerfile で Dockernize しておいたので ECS からも利用可能だと思う。
https://hub.docker.com/r/inokappa/kcl-datadog/
構成図
前回の構成から幾つかのコンポーネントが追加。
CloudWatch Logs Subscriptions とは
サブスクリプション?
正直言って「サブスクリプション」という言葉に混乱させられたが、ドキュメントを抜粋させていただくと以下のような説明がある。
サブスクリプションを使用して CloudWatch Logs からのログイベントのリアルタイムフィードにアクセスし、カスタム処理、分析、他のシステムへのロードを行うために Amazon Kinesis ストリームに配信することができます。
ざっくり言うと…
- CloudWatch Logs に集めたログを…
- リアルタイムに Kinesis に転送
- Kinesis 上のログファイルを KCL を利用したアプリケーションでよしなに(カスタム処理、分析等)
必要な設定
設定項目 | 詳細 |
---|---|
log-group-name | CloudWatch Logs の Log Group Name を指定する(指定したグループがサブスクリプション |
フィルタパターン | Log Group Name に定義されたロググループ内のイベントをフィルタし配信先の Kinesis ストリームを制限する |
destination arn | 配信先の Kinesis ストリームのリソース名(ARN) |
role arn | Kinesis ストリームにログを配信する為に必要な権限(CloudWatch Logs に付与する IAM ロール) |
ざっくり手順
Kinesis ストリームの作成と IAM ロールの作成
これは Terraform で一気通貫に。
$ git clone https://github.com/inokappa/oreno-terraform-cloudwatch-logs-subscriptions.git $ cd oreno-terraform-cloudwatch-logs-subscriptions $ terraform apply -var 'access_key=AKxxxxxxxxxxxxxxxxxxxxxxxx' -var 'secret_key=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
ストリーム名が決め打ちとなっているので注意!
サブスクリプションフィルタの定義
Kinesis ストリームにログを飛ばす為にログのフィルタを定義する。
aws --region ap-northeast-1 logs put-subscription-filter --log-group-name "my-flow-log" --filter-name "Reject-Filter" --filter-pattern '[version,account_id,interface_id,srcaddr,dstaddr,srcport,dstport,protocol,packets,bytes,start,end,action = REJECT,log_status]' --destination-arn "arn:aws:kinesis:ap-northeast-1:123456789012:stream/my-flow-log" --role-arn "arn:aws:iam::123456789012:role/CWLtoKinesisRole"
フィルタの定義については以下のドキュメントが非常に参考になった。
今回は action
フィールドに REJECT
が入っているログを対象とするので以下のようにフィルタを定義する。
--filter-pattern '[version,account_id,interface_id,srcaddr,dstaddr,srcport,dstport,protocol,packets,bytes,start,end,action = REJECT,log_status]'
KCL アプリケーションの起動
以下のようにコンテナを起動する。(動作確認の為、AWS の権限についてはフルアクセスの権限を付与して確認を行ったが、実際の利用に際しては適切な権限を付与するべき)
docker run -d --name='kcl-datadog' --env='STREAM_NAME=${Kinesis Stream Name}' --env='REGION_NAME=${AWS Region}' --env='AWS_ACCESS_KEY=${AWS ACCESS KEY} ' --env='AWS_SECRET_KEY=${AWS SECRET ACCESS KEY}' --env='DATADOG_API_KEY=${Datadog API Key}' inokappa/kcl-datadog
コンテナを起動すると以下のようにログを確認してみる。
$ docker logs -f kcl-datadog (snip) Sep 04, 2015 6:28:26 AM com.amazonaws.services.kinesis.multilang.GetNextMessageTask handleLine INFO: Skipping unexpected line on STDOUT for shard shardId-000000000000: Rejected the 23 port access from 110.53.xxx.xxx Sep 04, 2015 6:28:26 AM com.amazonaws.services.kinesis.multilang.GetNextMessageTask handleLine INFO: Skipping unexpected line on STDOUT for shard shardId-000000000000: Rejected the 1433 port access from xxx.xxx.83.13 Sep 04, 2015 6:28:26 AM com.amazonaws.services.kinesis.multilang.GetNextMessageTask handleLine INFO: Skipping unexpected line on STDOUT for shard shardId-000000000000: Rejected the 445 port access from 109.xx.xxx.175 Sep 04, 2015 6:28:26 AM com.amazonaws.services.kinesis.multilang.GetNextMessageTask handleLine INFO: Skipping unexpected line on STDOUT for shard shardId-000000000000: Rejected the 3128 port access from 58.xxx.xxx.157 Sep 04, 2015 6:28:26 AM com.amazonaws.services.kinesis.multilang.MultiLangProtocol checkpoint
アプリケーションの作り等は非常に雑なので注意されたし…。
Datadog Event 画面
うわってなりそうな位に各所からいろんなポートに対してのアクセスが発生していることが判る。
Event から Monitor を使って通知を定義することが出来るので、さらに不正と思われるようなアクセスに関してはメールや Slack 等で通知を送ることも出来そう。
メモ
CloudWatch Logs Subscriptions と Kinesis
- すいません、Kinesis は良く解ってません
- KCL で取得出来るレコードは gzip にて圧縮されていて Base64 でエンコードされている(取得して展開する場合に注意する)
Amazon Kinesis Client Library for Ruby について
- Java 版 KCL のラッパーっぽい
- サンプルを少し修正するだけで簡単なアプリケーションが作れる(初心者に優しい)
- レコードの展開については以下のように実装した
events = JSON.parse(Zlib::GzipReader.new(StringIO.new(Base64.decode64(record['data']))).read) events['logEvents'].each do |event| puts "Rejected the #{event['extractedFields']['dstport']} port access from #{event['extractedFields']['srcaddr']}" emit_to_datadog("Rejected the #{event['extractedFields']['dstport']} port access from #{event['extractedFields']['srcaddr']}", event['extractedFields']['end']) end
Base64 をデコードしたレコードを unzip する処理については以下の記事が参考になった。
実際に irb で動かしてみてさらに納得。ありがとうございます。
- 尚、KCL for Ruby に関しては以下の記事がとても参考になった
ありがとうございます。
ということで…
VPC Flow Logs について深く掘り下げるつもりが脱線してしまったので、もう少し掘り下げてみたいと考えている。
以上。