はじめに
こんにちは、MSPセクションの秋山です。
AWS WAFログを Athena で分析しましたので紹介します。
前提
きっかけ
複数のS3-CloudFrontの静的Webホスティングを持つ環境でBOT攻撃の兆候がありました。
ただ、ログを取得しておらず詳細な調査が実施できませんでした。
方針
不審なアクセスに関して詳細な調査を行うため、以下の通り対応することにしました。
- CloudFrontにAWS WAF WebACLを適用し、WAFログを取得するよう設定する。
- コストの観点から複数のCloudFrontに対して、共通のAWS WAF WebACLを適用する。
- WAFログはS3に出力する。
WAFログにはルールマッチング等の情報が含まれるため、CloudFrontの標準ログ (アクセスログ)ではなく、
WAFログを取得することとしました。
課題
上記の場合、一つのWAFログ(S3バケット)に全てのCloudFrontに関するログが出力されます。
特定のCloudFrontに関するログを調査したい場合に面倒です、、、
対策
LambdaやCloudWatch Logsサブスクリプションを活用することで、
CloudFrontごとにWAFログを分けて保存することもできるようですが、
今回はAthenaでWAFログをクエリするという方法を試してみました。
手順
1.WAFログをS3に保存するよう設定する
AWS WAF > WebACLs > 対象のWebACL > Logging and metricsタグ
事前に作成したWAFログ保存用のS3バケットを指定するか、ここで新規にS3バケットを作成して指定します。
2.Athenaクエリログ保存用S3バケットを作成する
Amazon S3 > バケット > バケット作成
3.クエリ実行結果の保存先を設定する
Amazon Athena > クエリエディタ > 設定タグ
手順2で作成したS3バケットを指定します。
4.データベースを作成する
Amazon Athena > クエリエディタ > クエリ欄で以下を実行(データベース名は「waflogs」)
CREATE DATABASE waflogs
5.テーブルを作成する
Amazon Athena > クエリエディタ > データベース名で「waflogs」を選択
クエリ欄で以下を実行(テーブル名は「waflogs」)
CREATE EXTERNAL TABLE `テーブル名`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingRuleMatchDetails` array < struct < conditionType: string, location: string, matchedData: array < string > > >, `httpsourcename` string, `httpsourceid` string, `ruleGroupList` array < struct < ruleGroupId: string, terminatingRule: struct < ruleId: string, action: string >, nonTerminatingMatchingRules: array < struct < action: string, ruleId: string > >, excludedRules: array < struct < exclusionType: string, ruleId: string > > > >, `ratebasedrulelist` array< struct< ratebasedruleid:string, limitkey:string, maxrateallowed:int > >, `nonterminatingmatchingrules` array< struct< ruleid:string, action:string > >, `httprequest` struct< clientip:string, country:string, headers:array< struct< name:string, value:string > >, uri:string, args:string, httpversion:string, httpmethod:string, requestid:string > ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' LOCATION 'WAFログを保存しているS3バケットのオブジェクトURL'
6.クエリ実行
対象CloudFrontに関するIPアドレス別のアクセス数の集計するには、以下のクエリを実行します。
SELECT httprequest.clientip, COUNT(httprequest.clientip) AS count FROM "テーブル名" WHERE DATE_FORMAT(FROM_UNIXTIME(timestamp/1000, 'Asia/Tokyo'), '%Y-%m-%d %H:%i:%s') BETWEEN 'xxxx-xx-xx xx:xx:xx' AND 'xxxx-xx-xx xx:xx:xx' AND httpsourceid = '対象のディストリビューションID' GROUP BY httprequest.clientip;
クエリ実行時には、データベース名とテーブル名で「waflogs」を選択しておきます。
クエリの実行結果は、コンソール上で確認することもできますし、CSVファイルをダウンロードすることもできます。
最後までご覧いただきありがとうございました!