はじめに

以下の記事では ALB アクセスログを自作ログパーサーを使って収集しました。

このツールは Apache Common Log Format にも対応しているので、今回は EC2 上の httpd のアクセスログを収集してみます。なお、CLF から JSON への変換は Blueprint でもできますが、Combined 形式のログにはマッチしないようでした。

概要

CDK を使い、以下のような簡易構成を作ります。前回と違うのは ALB のところに直接 EC2 がある点と、td-agent を使ってログを転送する点です。

  • 生ログのまま S3 バケットに転送
  • S3 トリガーで呼び出した Lambda 関数で加工して Firehose に PUT
  • 変換したログを Firehose が別の S3 バケットに PUT

この構成では、EC2 上のアクセスログも ALB や CloudFront などと同じログバケットに集約するケースを想定しています。td-agent を使う方法は古くからありますが、CloudWatch Logs を挟まずに手軽に S3 に転送できる点が魅力です。

構築

リソースは前回 CDK で使ったリポジトリに少し手を加えて流用します。その上で EC2 にアクセスして td-agent を仕込みます。修正点は以下です。

  • EC2 ロールに S3 へのアクセス許可を追加
  • Lambda 関数を一部修正

EC2 ロール

EC2 ロールには今回 AmazonS3FullAccess をアタッチしました。必要に応じて絞りましょう。

01
02
03
04
05
06
07
08
09
10
11
const ec2Role = new cdk.aws_iam.Role(this, "InstanceRole", {
  roleName: `${props.serviceName}-instance-role`,
  assumedBy: new cdk.aws_iam.ServicePrincipal("ec2.amazonaws.com"),
  managedPolicies: [
    cdk.aws_iam.ManagedPolicy.fromManagedPolicyArn(this, "S3Access", "arn:aws:iam::aws:policy/AmazonS3FullAccess"),
  ],
});
new cdk.aws_iam.CfnInstanceProfile(this, "InstanceProfile", {
  instanceProfileName: `${props.serviceName}-instance-profile`,
  roles: [ec2Role.roleName],
});

Lambda 関数

デプロイする Lambda 関数ですが、前回の parser.NewALBRegexParserparser.NewApacheCLFRegexParser に差し替えるだけです。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package main
 
import (
    "bytes"
    "compress/gzip"
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
 
    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/firehose"
    "github.com/aws/aws-sdk-go-v2/service/firehose/types"
    "github.com/aws/aws-sdk-go-v2/service/s3"
 
    parser "github.com/nekrassov01/access-log-parser"
)
 
var cfg aws.Config
 
func init() {
    var err error
    cfg, err = config.LoadDefaultConfig(context.Background())
    if err != nil {
        log.Fatalf("cannot load aws sdk config: %v", err)
    }
}
 
func handleRequest(ctx context.Context, event events.S3Event) error {
    buf := &bytes.Buffer{}
 
    // ここだけ変える
-   p := parser.NewALBRegexParser(ctx, buf, parser.Option{})
+   p := parser.NewApacheCLFRegexParser(ctx, buf, parser.Option{})
 
  s3client := s3.NewFromConfig(cfg)
    firehoseClient := firehose.NewFromConfig(cfg)
    for _, record := range event.Records {
        obj, err := s3client.GetObject(ctx, &s3.GetObjectInput{
            Bucket: aws.String(record.S3.Bucket.Name),
            Key:    aws.String(record.S3.Object.Key),
        })
        if err != nil {
            return err
        }
        r, err := gzip.NewReader(obj.Body)
        if err != nil {
            return err
        }
        defer r.Close()
        result, err := p.Parse(r)
        if err != nil {
            return err
        }
        b, err := json.Marshal(result)
        if err != nil {
            return err
        }
        fmt.Println(string(b))
    }
    if buf.Len() == 0 {
        return fmt.Errorf("abort process because buffer is empty")
    }
    resp, err := firehoseClient.PutRecordBatch(ctx, &firehose.PutRecordBatchInput{
        DeliveryStreamName: aws.String(os.Getenv("FIREHOSE_STREAM_NAME")),
        Records: []types.Record{
            {
                Data: buf.Bytes(),
            },
        },
    })
    if err != nil {
        return err
    }
    if resp != nil {
        b, err := json.Marshal(resp)
        if err != nil {
            return err
        }
        fmt.Println(string(b))
    }
    return nil
}
 
func main() {
    lambda.Start(handleRequest)
}

修正後、変更をデプロイします。

1
2
cdk synth
cdk deploy

td-agent

Amazon Linux 2 へのアクセスは EC2 Instance Connect エンドポイントを使うのがいちばん楽です。CDK のこの部分で構築しています。いちど作っておけば GUI から 1, 2 クリックで接続できます。

td-agent をインストールします。

1
curl -L https://toolbelt.treasuredata.com/sh/install-amazon2-td-agent3.sh | sh

S3 用のプラグインがインストールされているか確認します。目的の fluent-plugin-s3 が最初から入っていました。

1
2
3
$ /usr/sbin/td-agent-gem list | grep s3
aws-sdk-s3 (1.69.1)
fluent-plugin-s3 (1.3.2)

/var/log 配下にアクセスするために実行ユーザーを変更します。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
$ cat /usr/lib/systemd/system/td-agent.service
...
[Service]
User=td-agent
Group=td-agent
...
$ sudo sed -i -e "s|^User=td-agent|User=root|g" /usr/lib/systemd/system/td-agent.service
$ sudo sed -i -e "s|^Group=td-agent|Group=root|g" /usr/lib/systemd/system/td-agent.service
$ cat /usr/lib/systemd/system/td-agent.service
...
[Service]
User=root
Group=root
...

設定ファイルをバックアップします。

1
sudo cp /etc/td-agent/td-agent.conf /etc/td-agent/td-agent.conf.bk

そして td-agent の設定です。以下のようにしました。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<source>
  @type tail
  <parse>
    @type none
  </parse>
  path /var/log/httpd/access_log
  pos_file /var/log/td-agent/access_log.pos
  tag access_log
</source>
 
<match access_log>
  @type s3
  s3_bucket kawashima-test-accesslogs
  s3_region ap-northeast-1
  path "#{Socket.gethostname}/"
  time_slice_format %Y-%m-%d/%H-%M-%S
  <format>
    @type single_value
  </format>
  <buffer>
    @type file
    path /var/log/td-agent/s3
    timekey 5m
    timekey_wait 5m
    chunk_limit_size 256m
  </buffer>
</match>
  • @type none @type single_value で未加工のまま転送しています
  • path "#{Socket.gethostname}/" でホスト名を S3 のプレフィックスとして使います
  • timekey 5m timekey_wait 5m で転送間隔を 5 分ごとにします

設定をチェックします。

1
td-agent --dry-run -c /etc/td-agent/td-agent.conf

問題がなければプロセスを再起動します。

1
2
sudo systemctl restart td-agent.service
systemctl status td-agent.service

最後に、エラーが出ていないか確認しておきましょう。

1
sudo tail -f /var/log/td-agent/td-agent.log

動作確認

ここまでの設定で構築が完了しました。前回と同様に適当にブラウザアクセスし、5 分ほど待ってログバケットを確認します。EC2 のホスト名で問題なくプレフィックスが生えています。

ログが問題なく転送されています。一部マスクしていますが、未加工の状態です。前回作成した ALB の配下にいる EC2 に td-agent を仕込んだので、ヘルスチェックのログが少々邪魔ですね。また ELB 経由のアクセスなのでプライベート IP で記録されています。

1
2
3
4
5
6
7
10.0.1.109 - - [18/Mar/2024:02:26:05 +0000] "GET / HTTP/1.1" 304 - "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
10.0.1.109 - - [18/Mar/2024:02:26:12 +0000] "GET / HTTP/1.1" 200 17 "-" "ELB-HealthChecker/2.0"
10.0.0.219 - - [18/Mar/2024:02:26:12 +0000] "GET / HTTP/1.1" 200 17 "-" "ELB-HealthChecker/2.0"
10.0.1.109 - - [18/Mar/2024:02:26:42 +0000] "GET / HTTP/1.1" 200 17 "-" "ELB-HealthChecker/2.0"
10.0.0.219 - - [18/Mar/2024:02:26:42 +0000] "GET / HTTP/1.1" 200 17 "-" "ELB-HealthChecker/2.0"
10.0.0.219 - - [18/Mar/2024:02:26:42 +0000] "GET / HTTP/1.1" 200 17 "http://0.0.0.0:80/" "xxxxxx"
10.0.1.109 - - [18/Mar/2024:02:26:53 +0000] "GET / HTTP/1.1" 200 17 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"

CloudWatch Logs を確認します。問題なさそうです。

ログパーサーの結果

01
02
03
04
05
06
07
08
09
10
{
    "total": 7,
    "matched": 7,
    "unmatched": 0,
    "excluded": 0,
    "skipped": 0,
    "elapsedTime": 179159,
    "source": "",
    "errors": []
}

PutRecordBatch API の結果

01
02
03
04
05
06
07
08
09
10
11
12
{
    "FailedPutCount": 0,
    "RequestResponses": [
        {
            "ErrorCode": null,
            "ErrorMessage": null,
            "RecordId": "xxx"
        }
    ],
    "Encrypted": false,
    "ResultMetadata": {}
}

次に最終的な変換後のログが行き先用の S3 バケットに配置されているか確認します。

1
2
3
4
5
6
7
{"remote_host":"10.0.1.109","remote_logname":"-","remote_user":"-","datetime":"[18/Mar/2024:02:26:05 +0000]","method":"GET","request_uri":"/","protocol":"HTTP/1.1","status":"304","size":"-","referer":"-","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"}
{"remote_host":"10.0.1.109","remote_logname":"-","remote_user":"-","datetime":"[18/Mar/2024:02:26:12 +0000]","method":"GET","request_uri":"/","protocol":"HTTP/1.1","status":"200","size":"17","referer":"-","user_agent":"ELB-HealthChecker/2.0"}
{"remote_host":"10.0.0.219","remote_logname":"-","remote_user":"-","datetime":"[18/Mar/2024:02:26:12 +0000]","method":"GET","request_uri":"/","protocol":"HTTP/1.1","status":"200","size":"17","referer":"-","user_agent":"ELB-HealthChecker/2.0"}
{"remote_host":"10.0.1.109","remote_logname":"-","remote_user":"-","datetime":"[18/Mar/2024:02:26:42 +0000]","method":"GET","request_uri":"/","protocol":"HTTP/1.1","status":"200","size":"17","referer":"-","user_agent":"ELB-HealthChecker/2.0"}
{"remote_host":"10.0.0.219","remote_logname":"-","remote_user":"-","datetime":"[18/Mar/2024:02:26:42 +0000]","method":"GET","request_uri":"/","protocol":"HTTP/1.1","status":"200","size":"17","referer":"-","user_agent":"ELB-HealthChecker/2.0"}
{"remote_host":"10.0.0.219","remote_logname":"-","remote_user":"-","datetime":"[18/Mar/2024:02:26:42 +0000]","method":"GET","request_uri":"/","protocol":"HTTP/1.1","status":"200","size":"17","referer":"http://0.0.0.0:80/","user_agent":"xxxxxx"}
{"remote_host":"10.0.1.109","remote_logname":"-","remote_user":"-","datetime":"[18/Mar/2024:02:26:53 +0000]","method":"GET","request_uri":"/","protocol":"HTTP/1.1","status":"200","size":"17","referer":"-","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"}

変換後のログが無事に出力されていました!

不要なログ行を除外してみる

今回のケースだと、EC2 側で受けた ELB ヘルスチェックのログを集約する必要はありません。ログパーサーのオプションで除外を設定できるので、対応してみます。

1
2
3
p := parser.NewApacheCLFRegexParser(ctx, os.Stdout, parser.Option{
    Filters: []string{"user_agent !~* ^ELB-HealthChecker"},
})

このように Filters に文字列で式を渡すと、有効な式かどうかを評価し、有効であればその条件に基づいて行をフィルタリングします。今回は正規表現での比較かつ否定のパターン !~ で、* を追加して case-insensitive にしています。

Lambda 関数を再度デプロイして検証してみましょう。logs を見るとちゃんと excluded にカウントされていることがわかります。

01
02
03
04
05
06
07
08
09
10
{
    "total": 7,
    "matched": 3,
    "unmatched": 0,
    "excluded": 4,
    "skipped": 0,
    "elapsedTime": 201526,
    "source": "",
    "errors": []
}

再度検証すると、ヘルスチェックのログは除外されていました。オプションを渡すだけで簡単にカスタマイズできました。

1
2
3
{"remote_host":"10.0.1.109","remote_logname":"-","remote_user":"-","datetime":"[18/Mar/2024:03:26:01 +0000]","method":"GET","request_uri":"/","protocol":"HTTP/1.1","status":"200","size":"17","referer":"-","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"}
{"remote_host":"10.0.1.109","remote_logname":"-","remote_user":"-","datetime":"[18/Mar/2024:03:26:02 +0000]","method":"GET","request_uri":"/","protocol":"HTTP/1.1","status":"200","size":"17","referer":"-","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"}
{"remote_host":"10.0.1.109","remote_logname":"-","remote_user":"-","datetime":"[18/Mar/2024:03:30:10 +0000]","method":"GET","request_uri":"/stream?streams=btcusdt@depth","protocol":"HTTP/1.1","status":"404","size":"196","referer":"-","user_agent":"-"}

おわりに

td-agent を使って S3 にログを収集する方法を紹介しながら、ログパーサーが EC2 上のアクセスログの場合でも有効に機能することを確認しました。前回からの検証で収集に関してはだいぶポイントを押さえることができたので、次はもう少し大きなログ基盤を作ってみたいと思います。