はじめに


こんにちは、MSPセクションの秋山です。
AWS WAFログを Athena で分析しましたので紹介します。

前提


きっかけ

複数のS3-CloudFrontの静的Webホスティングを持つ環境でBOT攻撃の兆候がありました。
ただ、ログを取得しておらず詳細な調査が実施できませんでした。

方針

不審なアクセスに関して詳細な調査を行うため、以下の通り対応することにしました。

  • CloudFrontにAWS WAF WebACLを適用し、WAFログを取得するよう設定する。
  • コストの観点から複数のCloudFrontに対して、共通のAWS WAF WebACLを適用する。
  • WAFログはS3に出力する。

WAFログにはルールマッチング等の情報が含まれるため、CloudFrontの標準ログ (アクセスログ)ではなく、
WAFログを取得することとしました。

課題

上記の場合、一つのWAFログ(S3バケット)に全てのCloudFrontに関するログが出力されます。
特定のCloudFrontに関するログを調査したい場合に面倒です、、、

対策

LambdaやCloudWatch Logsサブスクリプションを活用することで、
CloudFrontごとにWAFログを分けて保存することもできるようですが、
今回はAthenaでWAFログをクエリするという方法を試してみました。

手順


1.WAFログをS3に保存するよう設定する

AWS WAF > WebACLs > 対象のWebACL > Logging and metricsタグ
事前に作成したWAFログ保存用のS3バケットを指定するか、ここで新規にS3バケットを作成して指定します。

2.Athenaクエリログ保存用S3バケットを作成する

Amazon S3 > バケット > バケット作成

3.クエリ実行結果の保存先を設定する

Amazon Athena > クエリエディタ > 設定タグ
手順2で作成したS3バケットを指定します。

4.データベースを作成する

Amazon Athena > クエリエディタ > クエリ欄で以下を実行(データベース名は「waflogs」)

CREATE DATABASE waflogs

5.テーブルを作成する

Amazon Athena > クエリエディタ > データベース名で「waflogs」を選択

クエリ欄で以下を実行(テーブル名は「waflogs」)

CREATE EXTERNAL TABLE `テーブル名`(
`timestamp` bigint,
`formatversion` int,
`webaclid` string,
`terminatingruleid` string,
`terminatingruletype` string,
`action` string,
`terminatingRuleMatchDetails` array < struct <
conditionType: string,
location: string,
matchedData: array < string >
> >,
`httpsourcename` string,
`httpsourceid` string,
`ruleGroupList` array < struct <
ruleGroupId: string,
terminatingRule: struct < ruleId: string, action: string >,
nonTerminatingMatchingRules: array < struct < action: string, ruleId: string > >,
excludedRules: array < struct < exclusionType: string, ruleId: string > >
> >,
`ratebasedrulelist` array<
struct< ratebasedruleid:string, limitkey:string, maxrateallowed:int >
>,
`nonterminatingmatchingrules` array<
struct< ruleid:string, action:string >
>,
`httprequest` struct<
clientip:string,
country:string,
headers:array<
struct< name:string, value:string >
>,
uri:string,
args:string,
httpversion:string,
httpmethod:string,
requestid:string
>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 'WAFログを保存しているS3バケットのオブジェクトURL'

6.クエリ実行

対象CloudFrontに関するIPアドレス別のアクセス数の集計するには、以下のクエリを実行します。

SELECT httprequest.clientip,
COUNT(httprequest.clientip) AS count
FROM
"テーブル名"
WHERE
DATE_FORMAT(FROM_UNIXTIME(timestamp/1000, 'Asia/Tokyo'), '%Y-%m-%d %H:%i:%s')
BETWEEN 'xxxx-xx-xx xx:xx:xx'
AND 'xxxx-xx-xx xx:xx:xx'
AND httpsourceid = '対象のディストリビューションID'
GROUP BY
httprequest.clientip;

クエリ実行時には、データベース名とテーブル名で「waflogs」を選択しておきます。

クエリの実行結果は、コンソール上で確認することもできますし、CSVファイルをダウンロードすることもできます。

最後までご覧いただきありがとうございました!