はじめに

以前、ログパーサーを Go で自作してみた件をブログに書きました。AWS 環境で使うことを前提に作ったので、さまざまな AWS リソースのログ解析機能がプリセットされています。

これも地味に改善を続けていまして、現在では以下のような機能が加わり、使い勝手がよくなっています (半分宣伝)

  • ラベル名でカラムの絞り込みをできるようにした
  • size < 100 method == GET remote_host =~ ^192.168. のようなフィルタ式で行を絞り込めるようにした
  • tail -f の出力をパイプから流し込めるようにした (リアルタイム解析できるようにした)
  • bufio.Scanner から io.Writer に流し込むように設計を変更し、メモリにやさしくパフォーマンスも改善

このツール、CLI として配布していますがコア機能はすべて Go のモジュールとして再利用可能です。

GitHub - nekrassov01/access-log-parser: Simple access log parser utilities written in Go
Simple access log parser utilities written in Go. Contribute to nekrassov01/access-log-parser development by creating an account on GitHub.

今回は AWS でのログ解析ワークロードがこのモジュールでどれだけ簡単/効率的に実装できるかを検証しました。焦点としては Lambda の記述をいかに軽くできるかになりますが、かなり簡素化できましたのでぜひご覧ください。

概要

CDK を使い、以下のような簡易構成を作ります。

構築

CDK で各リソースを作ります。リポジトリはこちらです。

GitHub - nekrassov01/cdk-alb-logparser: Sample of parsing ALB access logs with Lambda and put to Firehose
Sample of parsing ALB access logs with Lambda and put to Firehose - nekrassov01/cdk-alb-logparser

Construct を以下のように分けています。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
.
├── bin
:   └── main.ts
├── lib
│   ├── constructs
│   │   ├── handler.ts
│   │   ├── network.ts
│   │   ├── service.ts
│   │   └── stream.ts
:   └── stack.ts
├── src
│   ├── ec2
│   │   └── userdata.sh
│   └── lambda
│       └── logparser
│           ├── go.mod
│           ├── go.sum
:           └── main.go

Network

VPC の作成です。特に変わったことはしていないので割愛します。ソースはこちら

Service

以下を作っています。ソースはこちら

  • EC2 関連リソース
    • ロール
    • セキュリティグループ
    • 起動テンプレート
    • Auto Scaling グループ
  • ALB 関連リソース
    • セキュリティグループ
    • HTTP リスナー
    • HTTPS リスナー
    • ターゲットグループ
    • ACM 証明書
    • ログバケット
  • EC2 Instance Connect Endpoint (踏み台用)
  • ALB エイリアスレコード

Stream

Amazon Data Firehose デリバリーストリームを作ります。以下のようなコードにしました。今回は S3 トリガーでログの変換を行うので、Lambda 関数は統合していません。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import * as cdk from "aws-cdk-lib";
import { ApplicationLoadBalancer } from "aws-cdk-lib/aws-elasticloadbalancingv2";
import { Construct } from "constructs";
 
export interface StreamProps {
  serviceName: string;
  alb: ApplicationLoadBalancer;
}
 
export class Stream extends Construct {
  readonly deliveryStream: cdk.aws_kinesisfirehose.CfnDeliveryStream;
 
  constructor(scope: Construct, id: string, props: StreamProps) {
    super(scope, id);
    const stack = cdk.Stack.of(this);
 
    // Create S3 bucket for transformed logs
    const dstBucket = new cdk.aws_s3.Bucket(this, "DestinationBucket", {
      bucketName: `${props.serviceName}-destination`,
      blockPublicAccess: cdk.aws_s3.BlockPublicAccess.BLOCK_ALL,
      publicReadAccess: false,
      encryption: cdk.aws_s3.BucketEncryption.S3_MANAGED,
      enforceSSL: true,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
      versioned: false,
      objectOwnership: cdk.aws_s3.ObjectOwnership.BUCKET_OWNER_PREFERRED,
    });
 
    // Create role for firehose stream
    const firehoseRole = new cdk.aws_iam.Role(this, "FirehoseRole", {
      roleName: `${props.serviceName}-firehose-role`,
      assumedBy: new cdk.aws_iam.ServicePrincipal("firehose.amazonaws.com"),
      inlinePolicies: {
        ["FirehoseRoleAdditionalPolicy"]: new cdk.aws_iam.PolicyDocument({
          statements: [
            new cdk.aws_iam.PolicyStatement({
              effect: cdk.aws_iam.Effect.ALLOW,
              actions: [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject",
              ],
              resources: [dstBucket.bucketArn, `${dstBucket.bucketArn}/*`],
            }),
            new cdk.aws_iam.PolicyStatement({
              effect: cdk.aws_iam.Effect.ALLOW,
              actions: ["logs:PutLogEvents"],
              resources: [`arn:aws:logs:${stack.region}:${stack.account}:log-group:/aws/kinesisfirehose/*`],
            }),
          ],
        }),
      },
    });
 
    // Create log group and stream for firehose error logs
    const firehoseFailLogGroup = new cdk.aws_logs.LogGroup(this, "FirehoseFailLogGroup", {
      logGroupName: `/aws/kinesisfirehose/${props.serviceName}-firehose/fail`,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      retention: cdk.aws_logs.RetentionDays.THREE_DAYS,
    });
    const firehoseFailLogStream = new cdk.aws_logs.LogStream(this, "FirehoseFailLogStream", {
      logGroup: firehoseFailLogGroup,
      logStreamName: "logs",
    });
 
    // Create firehose delivery stream
    this.deliveryStream = new cdk.aws_kinesisfirehose.CfnDeliveryStream(this, "Firehose", {
      deliveryStreamName: `${props.serviceName}-firehose`,
      deliveryStreamType: "DirectPut",
      s3DestinationConfiguration: {
        bucketArn: dstBucket.bucketArn,
        roleArn: firehoseRole.roleArn,
        cloudWatchLoggingOptions: {
          enabled: true,
          logGroupName: firehoseFailLogGroup.logGroupName,
          logStreamName: "logs",
        },
        compressionFormat: "GZIP",
        errorOutputPrefix: "/errors",
        bufferingHints: {
          sizeInMBs: 5,
          intervalInSeconds: 300,
        },
      },
    });
 
    this.deliveryStream._addResourceDependency(dstBucket.node.defaultChild as cdk.CfnResource);
    this.deliveryStream._addResourceDependency(firehoseFailLogGroup.node.defaultChild as cdk.CfnResource);
    this.deliveryStream._addResourceDependency(firehoseFailLogStream.node.defaultChild as cdk.CfnResource);
  }
}

Handler

Lambda のリソース設定です。Go なので、ランタイムは Amazon Linux 2 を選択し、ローカルファイルから Docker でビルドします。このあたりは以前の記事を参考にしてください。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
 
export interface HandlerProps {
  serviceName: string;
  albLogBucket: cdk.aws_s3.Bucket;
  deliveryStream: cdk.aws_kinesisfirehose.CfnDeliveryStream;
}
 
export class Handler extends Construct {
  constructor(scope: Construct, id: string, props: HandlerProps) {
    super(scope, id);
    const stack = cdk.Stack.of(this);
 
    // Create role for lambda function
    const logParserRole = new cdk.aws_iam.Role(this, "LogParserRole", {
      roleName: `${props.serviceName}-logparser-role`,
      assumedBy: new cdk.aws_iam.ServicePrincipal("lambda.amazonaws.com"),
      inlinePolicies: {
        ["HandlerLogParserRoleAdditionalPolicy"]: new cdk.aws_iam.PolicyDocument({
          statements: [
            new cdk.aws_iam.PolicyStatement({
              effect: cdk.aws_iam.Effect.ALLOW,
              actions: ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"],
              resources: ["arn:aws:logs:*:*:*"],
            }),
            new cdk.aws_iam.PolicyStatement({
              effect: cdk.aws_iam.Effect.ALLOW,
              actions: ["firehose:DescribeStream", "firehose:GetRecords", "firehose:PutRecordBatch"],
              resources: [`arn:aws:firehose:${stack.region}:${stack.account}:deliverystream/*`],
            }),
            new cdk.aws_iam.PolicyStatement({
              effect: cdk.aws_iam.Effect.ALLOW,
              actions: ["s3:GetObject", "s3:GetBucketLocation", "s3:ListBucket"],
              resources: [`arn:aws:s3:::*`],
            }),
          ],
        }),
      },
    });
 
    // Create DLQ
    const logParserDLQ = new cdk.aws_sqs.Queue(this, "LogParserQueue", {
      queueName: `${props.serviceName}-logparser-dlq`,
      encryption: cdk.aws_sqs.QueueEncryption.SQS_MANAGED,
      enforceSSL: true,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      retentionPeriod: cdk.Duration.days(7),
    });
 
    // Create lambda function
    const logParser = new cdk.aws_lambda.Function(this, "LogParser", {
      functionName: `${props.serviceName}-logparser`,
      description: "Parsing ALB logs and put to firehose",
      code: cdk.aws_lambda.Code.fromAsset("src/lambda/logparser", {
        bundling: {
          image: cdk.DockerImage.fromRegistry("golang:1.22.0"),
          command: [
            "bash",
            "-c",
            [
              "export GOCACHE=/tmp/go-cache",
              "export GOPATH=/tmp/go-path",
              "CGO_ENABLED=0 GOOS=linux go build -tags lambda.norpc -o /asset-output/bootstrap main.go",
            ].join(" && "),
          ],
        },
      }),
      handler: "bootstrap",
      architecture: cdk.aws_lambda.Architecture.ARM_64,
      runtime: cdk.aws_lambda.Runtime.PROVIDED_AL2,
      role: logParserRole,
      logRetention: cdk.aws_logs.RetentionDays.THREE_DAYS,
      currentVersionOptions: {
        removalPolicy: cdk.RemovalPolicy.RETAIN,
      },
      deadLetterQueueEnabled: true,
      deadLetterQueue: logParserDLQ,
      timeout: cdk.Duration.minutes(5),
      reservedConcurrentExecutions: 1,
      retryAttempts: 2,
      environment: {
        FIREHOSE_STREAM_NAME: props.deliveryStream.deliveryStreamName!,
      },
    });
 
    // Update function alias
    const alias = new cdk.aws_lambda.Alias(scope, "LogParserAlias", {
      aliasName: "live",
      version: logParser.currentVersion,
    });
 
    // Set function alias to S3 event
    props.albLogBucket.addEventNotification(
      cdk.aws_s3.EventType.OBJECT_CREATED_PUT,
      new cdk.aws_s3_notifications.LambdaDestination(alias)
    );
  }
}

環境変数で Firehose のストリーム名を渡しています。

1
2
3
environment: {
  FIREHOSE_STREAM_NAME: props.deliveryStream.deliveryStreamName!,
},

あとあとのことを考えて、エイリアスを使うようにしておいたほうがよいでしょう。

1
2
3
4
5
// Update function alias
const alias = new cdk.aws_lambda.Alias(scope, "LogParserAlias", {
  aliasName: "live",
  version: logParser.currentVersion,
});

最後に、props で渡したログバケットに対して S3 イベントを設定します。

1
2
3
4
5
// Set function alias to S3 event
props.albLogBucket.addEventNotification(
  cdk.aws_s3.EventType.OBJECT_CREATED_PUT,
  new cdk.aws_s3_notifications.LambdaDestination(alias)
);

ここまでで定義したリソースをデプロイし、環境が構築されたことを確認します。

1
2
cdk synth
cdk deploy

ログ解析関数

この記事のメインである Lambda 関数です。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main
 
import (
    "bytes"
    "compress/gzip"
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
 
    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/firehose"
    "github.com/aws/aws-sdk-go-v2/service/firehose/types"
    "github.com/aws/aws-sdk-go-v2/service/s3"
 
    parser "github.com/nekrassov01/access-log-parser"
)
 
var cfg aws.Config
 
func init() {
    var err error
    cfg, err = config.LoadDefaultConfig(context.Background())
    if err != nil {
        log.Fatalf("cannot load aws sdk config: %v", err)
    }
}
 
func handleRequest(ctx context.Context, event events.S3Event) error {
    buf := &bytes.Buffer{}
    p := parser.NewALBRegexParser(ctx, buf, parser.Option{})
    s3client := s3.NewFromConfig(cfg)
    firehoseClient := firehose.NewFromConfig(cfg)
    for _, record := range event.Records {
        obj, err := s3client.GetObject(ctx, &s3.GetObjectInput{
            Bucket: aws.String(record.S3.Bucket.Name),
            Key:    aws.String(record.S3.Object.Key),
        })
        if err != nil {
            return err
        }
        r, err := gzip.NewReader(obj.Body)
        if err != nil {
            return err
        }
        defer r.Close()
        result, err := p.Parse(r)
        if err != nil {
            return err
        }
        b, err := json.Marshal(result)
        if err != nil {
            return err
        }
        fmt.Println(string(b))
    }
    if buf.Len() == 0 {
        return fmt.Errorf("abort process because buffer is empty")
    }
    resp, err := firehoseClient.PutRecordBatch(ctx, &firehose.PutRecordBatchInput{
        DeliveryStreamName: aws.String(os.Getenv("FIREHOSE_STREAM_NAME")),
        Records: []types.Record{
            {
                Data: buf.Bytes(),
            },
        },
    })
    if err != nil {
        return err
    }
    if resp != nil {
        b, err := json.Marshal(resp)
        if err != nil {
            return err
        }
        fmt.Println(string(b))
    }
    return nil
}
 
func main() {
    lambda.Start(handleRequest)
}

基本はこの程度の記述で OK です。まず init() でグローバルに aws.Config をセットアップします。init() を使うことで、 Lambda 実行環境内でコールドスタート時に設定された aws.Config が複数の呼び出しで再利用され、効率が上がります。ウォームスタート時は init() は呼ばれません。

1
2
3
4
5
6
7
8
9
var cfg aws.Config
 
func init() {
    var err error
    cfg, err = config.LoadDefaultConfig(context.Background())
    if err != nil {
        log.Fatalf("cannot load aws sdk config: %v", err)
    }
}

handleRequest 関数ではまずログパーサーを初期化します。

1
2
buf := &bytes.Buffer{}
p := parser.NewALBRegexParser(ctx, buf, parser.Option{})

これだけで、ALB のアクセスログを解析するための正規表現がプリセットされた parser.RegexParser が初期化されます。内部では以下のようになっています。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
func NewALBRegexParser(ctx context.Context, writer io.Writer, opt Option) *RegexParser {
    p := &RegexParser{
        ctx:     ctx,
        writer:  writer,
        decoder: regexLineDecoder,
        opt:     opt,
        patterns: []*regexp.Regexp{
            regexp.MustCompile(`^(?P<type>[!-~]+) (?P<time>[!-~]+) (?P<elb>[!-~]+) (?P<client_port>[!-~]+) (?P<target_port>[!-~]+) (?P<request_processing_time>[\d\-.]+) (?P<target_processing_time>[\d\-.]+) (?P<response_processing_time>[\d\-.]+) (?P<elb_status_code>\d{1,3}|-) (?P<target_status_code>\d{1,3}|-) (?P<received_bytes>[\d\-.]+) (?P<sent_bytes>[\d\-.]+) \"(?P<method>[A-Z\-]+) (?P<request_uri>[^ \"]+) (?P<protocol>HTTP/[0-9.]+|-|-)\" "(?P<user_agent>[^\"]*)" (?P<ssl_cipher>[!-~]+) (?P<ssl_protocol>[!-~]+) (?P<target_group_arn>[!-~]+) "(?P<trace_id>[ -~]+)" "(?P<domain_name>[ -~]+)" "(?P<chosen_cert_arn>[ -~]+)" (?P<matched_rule_priority>[!-~]+) (?P<request_creation_time>[!-~]+) "(?P<actions_executed>[ -~]+)" "(?P<redirect_url>[ -~]+)" "(?P<error_reason>[ -~]+)" "(?P<target_port_list>[ -~]+)" "(?P<target_status_code_list>[ -~]+)" "(?P<classification>[ -~]+)" "(?P<classification_reason>[ -~]+)"`),
        },
    }
    if opt.LineHandler == nil {
        p.opt.LineHandler = JSONLineHandler
    }
    return p
}

引数は以下の通りです。

引数 説明
ctx context.Context 主に os.Interrupt シグナルを伝播させるためのコンテキスト
tail -f で解析中、CTRL+C で中断しても整合性のある parser.Result を返すことができる
writer io.Writer 出力先。今回は解凍および加工したログをバッファに出力する
opt parser.Option さまざまなオプション設定を構造体で渡す

オプションは以下のような構造体です。今回は使っていませんが、多機能なので紹介します。

1
2
3
4
5
6
7
8
9
type Option struct {
    Labels       []string
    Filters      []string
    SkipLines    []int
    Prefix       bool
    UnmatchLines bool
    LineNumber   bool
    LineHandler  LineHandler
}
フィールド 説明
Labels ラベル名で列を絞り込む
Filters size < 100 method == GET remote_host =~ ^192.168. のようなフィルタ式で行を絞り込む
SkipLines 行番号を渡すことでその行を除外する。ヘッダー除外などで有用
Prefix ログに [ PROCESSED | UNMATCHED ] のような接頭辞を付与
UnmatchLines マッチしなかったログを未加工のまま出すようにする。Prefix と併用することで視覚的にわかりやすくできる
LineNumber 変換後のログの先頭に行番号を付与
LineHandler 行の変換処理をカスタム関数で渡せる。デフォルトは NDJSON。LTSV などいろいろ用意している

本筋に戻ります。S3 と Firehose の API を使うので、クライアントを初期化します。

1
2
s3client := s3.NewFromConfig(cfg)
firehoseClient := firehose.NewFromConfig(cfg)

S3 イベントが配列なので、レコードをイテレーションする中で以下の流れで処理します。

まず S3 の GetObject で対象のオブジェクトを取得します。

1
2
3
4
5
6
7
obj, err := s3client.GetObject(ctx, &s3.GetObjectInput{
    Bucket: aws.String(record.S3.Bucket.Name),
    Key:    aws.String(record.S3.Object.Key),
})
if err != nil {
    return err
}

取得した Body は GZIP 圧縮されているので、gzip.NewReader で解凍します。

1
2
3
4
5
r, err := gzip.NewReader(obj.Body)
if err != nil {
    return err
}
defer r.Close()

parser.RegexParser には io.Reader を引数にとる Parse メソッドがあるので解凍した Body をそのまま渡せます。parser.NewALBRegexParser の第 2 引数で設定した buf に変換後のログが放流されます。

1
2
3
4
result, err := p.Parse(r)
if err != nil {
    return err
}

result には集計結果や解析できなかったログが入っているので、JSON に変換して標準出力に出すことで CloudWatch Logs に拾わせます。

1
2
3
4
5
b, err := json.Marshal(result)
if err != nil {
    return err
}
fmt.Println(string(b))

余談ですが result は parser.Result 型で、以下のような構造体です。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
type Result struct {
    Total       int           `json:"total"`
    Matched     int           `json:"matched"`
    Unmatched   int           `json:"unmatched"`
    Excluded    int           `json:"excluded"`
    Skipped     int           `json:"skipped"`
    ElapsedTime time.Duration `json:"elapsedTime"`
    Source      string        `json:"source"`
    ZipEntries  []string      `json:"zipEntries,omitempty"`
    Errors      []Errors      `json:"errors"`
    inputType   inputType     `json:"-"`
}
 
type Errors struct {
    Entry      string `json:"entry,omitempty"`
    LineNumber int    `json:"lineNumber"`
    Line       string `json:"line"`
}

これは fmt.Stringer を実装しており、fmt.Println() に渡すと String() で定義された文字列表現が表示されます。以下のような形です。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/* SUMMARY */
 
+-------+---------+-----------+----------+---------+-------------+--------------------------------+
| Total | Matched | Unmatched | Excluded | Skipped | ElapsedTime | Source                         |
+-------+---------+-----------+----------+---------+-------------+--------------------------------+
|     5 |       4 |         1 |        0 |       0 | 1.16375ms   | sample_s3_contains_unmatch.log |
+-------+---------+-----------+----------+---------+-------------+--------------------------------+
 
Total     : Total number of log line processed
Matched   : Number of log line that successfully matched pattern
Unmatched : Number of log line that did not match any pattern
Excluded  : Number of log line that did not extract by filter expressions
Skipped   : Number of log line that skipped by line number
 
/* UNMATCH LINES */
 
+------------+------------------------------------------------------------------------------------------------------+
| LineNumber | Line                                                                                                 |
+------------+------------------------------------------------------------------------------------------------------+
|          4 | d45e67fa89b012c3a45678901b234c56d78a90f12b3456789a012345c6789d01 awsrandombucket89 [03/Feb/2019:03:5 |
|            | 4:33 +0000] 192.0.2.76 d45e67fa89b012c3a45678901b234c56d78a90f12b3456789a012345c6789d01 7B4A0FABBEXA |
|            | MPLE REST.GET.VERSIONING - "GET /awsrandombucket89?versioning HTTP/1.1" 200 - 113 - 33 - "-" "S3Cons |
|            | ole/0.4"                                                                                             |
+------------+------------------------------------------------------------------------------------------------------+
 
LineNumber : Line number of the log that did not match any pattern
Line       : Raw log line that did not match any pattern

手元で使うには便利ですが再利用性はないので、JSON にシリアライズしてから logs に送り込む選択をしました。

Firehose に PUT する前に、バッファが空かどうか確認します。ログ行がすべてアンマッチであればここでひっかかります。

1
2
3
if buf.Len() == 0 {
    return fmt.Errorf("abort process because buffer is empty")
}

最後に、変換したログを Firehose に PUT します。ストリーム名は環境変数から読み込み、レコードには buf をバイト列に変換して渡します。レスポンスは JSON に変換して CloudWatch logs に出力します。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
resp, err := firehoseClient.PutRecordBatch(ctx, &firehose.PutRecordBatchInput{
    DeliveryStreamName: aws.String(os.Getenv("FIREHOSE_STREAM_NAME")),
    Records: []types.Record{
        {
            Data: buf.Bytes(),
        },
    },
})
if err != nil {
    return err
}
if resp != nil {
    b, err := json.Marshal(resp)
    if err != nil {
        return err
    }
    fmt.Println(string(b))
}
return nil

あとはこの関数を main 内で lambda に渡します。

1
2
3
func main() {
    lambda.Start(handleRequest)
}

動作確認

EC2 のユーザーデータで httpd を仕込んであるので、適当にブラウザアクセスしてみます。CloudWatch logs を見ると以下のような感じで出力されていました。ログパーサーの解析結果と PutRecordBatch API の結果を別々に確認できるので、トラブルシューティングしやすいと思います。

ログパーサーの結果

01
02
03
04
05
06
07
08
09
10
{
    "total": 4,
    "matched": 4,
    "unmatched": 0,
    "excluded": 0,
    "skipped": 0,
    "elapsedTime": 199721,
    "source": "",
    "errors": []
}

PutRecordBatch API の結果

01
02
03
04
05
06
07
08
09
10
11
12
{
    "FailedPutCount": 0,
    "RequestResponses": [
        {
            "ErrorCode": null,
            "ErrorMessage": null,
            "RecordId": "xxxx"
        }
    ],
    "Encrypted": false,
    "ResultMetadata": {}
}

行き先バケットのログをみてみると、NDJSON 形式に変換されたログが問題なく配置されていました (マスクしています)

1
2
3
4
5
{"type":"http","time":"2024-03-14T01:12:44.329452Z","elb":"app/kawashima-test-alb/xxxxxxxxxxxxxxxx","client_port":"0.1.2.3:37749","target_port":"-","request_processing_time":"-1","target_processing_time":"-1","response_processing_time":"-1","elb_status_code":"301","target_status_code":"-","received_bytes":"37","sent_bytes":"331","method":"GET","request_uri":"http://9.8.7.6:80/","protocol":"HTTP/1.1","user_agent":"-","ssl_cipher":"-","ssl_protocol":"-","target_group_arn":"-","trace_id":"Root=Root=1-00000000-111111111122222222220000","domain_name":"-","chosen_cert_arn":"-","matched_rule_priority":"0","request_creation_time":"2024-03-14T01:12:44.329000Z","actions_executed":"redirect","redirect_url":"https://9.8.7.6:443/","error_reason":"-","target_port_list":"-","target_status_code_list":"-","classification":"-","classification_reason":"-"}
{"type":"https","time":"2024-03-14T01:17:15.285828Z","elb":"app/kawashima-test-alb/xxxxxxxxxxxxxxxx","client_port":"2.3.4.5:43430","target_port":"10.0.2.249:80","request_processing_time":"0.001","target_processing_time":"0.001","response_processing_time":"0.000","elb_status_code":"200","target_status_code":"200","received_bytes":"247","sent_bytes":"294","method":"GET","request_uri":"https://9.8.7.6:443/","protocol":"HTTP/1.1","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0","ssl_cipher":"TLS_AES_128_GCM_SHA256","ssl_protocol":"TLSv1.3","target_group_arn":"arn:aws:elasticloadbalancing:ap-northeast-1:000000000000:targetgroup/kawashima-test-alb-tg/zzzzzzzzzzzzzzzz","trace_id":"Root=1-00000000-111111111122222222223333","domain_name":"-","chosen_cert_arn":"arn:aws:acm:ap-northeast-1:000000000000:certificate/12345678-1234-1234-1234-123456789012","matched_rule_priority":"0","request_creation_time":"2024-03-14T01:17:15.284000Z","actions_executed":"forward","redirect_url":"-","error_reason":"-","target_port_list":"10.0.2.249:80","target_status_code_list":"200","classification":"-","classification_reason":"-"}
{"type":"https","time":"2024-03-14T01:17:35.099409Z","elb":"app/kawashima-test-alb/xxxxxxxxxxxxxxxx","client_port":"2.3.4.5:50752","target_port":"10.0.3.34:80","request_processing_time":"0.002","target_processing_time":"0.002","response_processing_time":"0.000","elb_status_code":"404","target_status_code":"404","received_bytes":"414","sent_bytes":"375","method":"GET","request_uri":"https://9.8.7.6:443/favicon.ico","protocol":"HTTP/1.1","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11","ssl_cipher":"TLS_AES_128_GCM_SHA256","ssl_protocol":"TLSv1.3","target_group_arn":"arn:aws:elasticloadbalancing:ap-northeast-1:000000000000:targetgroup/kawashima-test-alb-tg/zzzzzzzzzzzzzzzz","trace_id":"Root=1-00000000-111111111122222222224444","domain_name":"-","chosen_cert_arn":"arn:aws:acm:ap-northeast-1:000000000000:certificate/12345678-1234-1234-1234-123456789012","matched_rule_priority":"0","request_creation_time":"2024-03-14T01:17:35.095000Z","actions_executed":"forward","redirect_url":"-","error_reason":"-","target_port_list":"10.0.3.34:80","target_status_code_list":"404","classification":"-","classification_reason":"-"}
{"type":"https","time":"2024-03-14T01:17:36.733786Z","elb":"app/kawashima-test-alb/xxxxxxxxxxxxxxxx","client_port":"2.3.4.5:50760","target_port":"10.0.2.249:80","request_processing_time":"0.001","target_processing_time":"0.000","response_processing_time":"0.000","elb_status_code":"404","target_status_code":"404","received_bytes":"413","sent_bytes":"375","method":"GET","request_uri":"https://9.8.7.6:443/robots.txt","protocol":"HTTP/1.1","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11","ssl_cipher":"TLS_AES_128_GCM_SHA256","ssl_protocol":"TLSv1.3","target_group_arn":"arn:aws:elasticloadbalancing:ap-northeast-1:000000000000:targetgroup/kawashima-test-alb-tg/zzzzzzzzzzzzzzzz","trace_id":"Root=1-00000000-111111111122222222225555","domain_name":"-","chosen_cert_arn":"arn:aws:acm:ap-northeast-1:000000000000:certificate/12345678-1234-1234-1234-123456789012","matched_rule_priority":"0","request_creation_time":"2024-03-14T01:17:36.732000Z","actions_executed":"forward","redirect_url":"-","error_reason":"-","target_port_list":"10.0.2.249:80","target_status_code_list":"404","classification":"-","classification_reason":"-"}
{"type":"https","time":"2024-03-14T01:17:38.346061Z","elb":"app/kawashima-test-alb/xxxxxxxxxxxxxxxx","client_port":"2.3.4.5:50772","target_port":"10.0.3.34:80","request_processing_time":"0.000","target_processing_time":"0.002","response_processing_time":"0.000","elb_status_code":"404","target_status_code":"404","received_bytes":"414","sent_bytes":"375","method":"GET","request_uri":"https://9.8.7.6:443/sitemap.xml","protocol":"HTTP/1.1","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11","ssl_cipher":"TLS_AES_128_GCM_SHA256","ssl_protocol":"TLSv1.3","target_group_arn":"arn:aws:elasticloadbalancing:ap-northeast-1:000000000000:targetgroup/kawashima-test-alb-tg/zzzzzzzzzzzzzzzz","trace_id":"Root=1-00000000-111111111122222222226666","domain_name":"-","chosen_cert_arn":"arn:aws:acm:ap-northeast-1:000000000000:certificate/12345678-1234-1234-1234-123456789012","matched_rule_priority":"0","request_creation_time":"2024-03-14T01:17:38.343000Z","actions_executed":"forward","redirect_url":"-","error_reason":"-","target_port_list":"10.0.3.34:80","target_status_code_list":"404","classification":"-","classification_reason":"-"}

再利用性

このコードは、少し変更するだけで ALB の他にも NLB, CLB, CloudFront, S3 で使えます。それぞれ専用の正規表現がプリセットされたコンストラクタ (というか初期化関数) を用意しているので、parser.NewALBRegexParser を以下のいずれかに変更するだけです。このことから再利用性が高いと言えます。

  • parser.NewNLBRegexPaser
  • parser.NewCLBRegexParser
  • parser.NewCFRegexParser
  • parser.NewS3RegexParser

注意点

今回は簡易構成なので、大規模環境ではそのまま使えません。以下に注意してください。

SQS の利用

S3 トリガーから直接 Lambda を呼び出していますが、大規模な環境では SQS を挟むなど、非同期アーキテクチャを意識した設計を検討してください。

PutRecordBatch API の制限

Amazon Data Firehose の PutRecordBatch API には以下のような制限があります。今回の Lambda 関数では、この制限を超過しないかを追跡してデータを分割するようなロジックは入れていないことに注意してください (ドキュメント)

The PutRecordBatch operation can take up to 500 records per call or 4 MiB per call, whichever is smaller. This quota cannot be changed.
(訳) PutRecordBatch オペレーションは、1 コールあたり 500 レコードまたは 4MiB のいずれか小さい方まで取ることができる。このクォータは変更できない。

PutRecordBatch API のレスポンス

今回は PutRecordBatch API の戻り値を logs に出力していますが、レスポンスには失敗情報も格納されるので、本番ワークロードではレスポンスをチェックしてリトライする処理も場合によっては必要になるかもしれません。FailedPutCount で処理に失敗した可能性のあるレコードの数がわかるので、これが 1 以上だったら対象のレコードをリトライするような機能を実装すればよいと思います。

おわりに

簡易的な構成ではありますが、自作ログパーサーの使い方を紹介しながら、ログを集約する環境を簡単かつ効率的に構築できることを確認しました。この延長でもっと本格的な DWH も設計および構築してみたいです。

大規模なログ集約を考える場合、S3 バケットにログを PUT する際にリソース名をプレフィックスに使ったり、あるいはログの先頭にリソース名を付与したりといった、ログの識別方式も重要になってくると思います。今回使ったログパーサーでは変換ロジックを外部から注入できるので、このへんもクリアできそうではあります。引き続き手を動かしてみたいと思います。