はじめに
以前、ログパーサーを Go で自作してみた件をブログに書きました。AWS 環境で使うことを前提に作ったので、さまざまな AWS リソースのログ解析機能がプリセットされています。
これも地味に改善を続けていまして、現在では以下のような機能が加わり、使い勝手がよくなっています (半分宣伝)
- ラベル名でカラムの絞り込みをできるようにした
size < 100
method == GET
remote_host =~ ^192.168.
のようなフィルタ式で行を絞り込めるようにしたtail -f
の出力をパイプから流し込めるようにした (リアルタイム解析できるようにした)bufio.Scanner
からio.Writer
に流し込むように設計を変更し、メモリにやさしくパフォーマンスも改善
このツール、CLI として配布していますがコア機能はすべて Go のモジュールとして再利用可能です。
今回は AWS でのログ解析ワークロードがこのモジュールでどれだけ簡単/効率的に実装できるかを検証しました。焦点としては Lambda の記述をいかに軽くできるかになりますが、かなり簡素化できましたのでぜひご覧ください。
概要
CDK を使い、以下のような簡易構成を作ります。
構築
CDK で各リソースを作ります。リポジトリはこちらです。
Construct を以下のように分けています。
. ├── bin : └── main.ts ├── lib │ ├── constructs │ │ ├── handler.ts │ │ ├── network.ts │ │ ├── service.ts │ │ └── stream.ts : └── stack.ts ├── src │ ├── ec2 │ │ └── userdata.sh │ └── lambda │ └── logparser │ ├── go.mod │ ├── go.sum : └── main.go
Network
VPC の作成です。特に変わったことはしていないので割愛します。ソースはこちら。
Service
以下を作っています。ソースはこちら。
- EC2 関連リソース
- ロール
- セキュリティグループ
- 起動テンプレート
- Auto Scaling グループ
- ALB 関連リソース
- セキュリティグループ
- HTTP リスナー
- HTTPS リスナー
- ターゲットグループ
- ACM 証明書
- ログバケット
- EC2 Instance Connect Endpoint (踏み台用)
- ALB エイリアスレコード
Stream
Amazon Data Firehose デリバリーストリームを作ります。以下のようなコードにしました。今回は S3 トリガーでログの変換を行うので、Lambda 関数は統合していません。
import * as cdk from "aws-cdk-lib"; import { ApplicationLoadBalancer } from "aws-cdk-lib/aws-elasticloadbalancingv2"; import { Construct } from "constructs"; export interface StreamProps { serviceName: string; alb: ApplicationLoadBalancer; } export class Stream extends Construct { readonly deliveryStream: cdk.aws_kinesisfirehose.CfnDeliveryStream; constructor(scope: Construct, id: string, props: StreamProps) { super(scope, id); const stack = cdk.Stack.of(this); // Create S3 bucket for transformed logs const dstBucket = new cdk.aws_s3.Bucket(this, "DestinationBucket", { bucketName: `${props.serviceName}-destination`, blockPublicAccess: cdk.aws_s3.BlockPublicAccess.BLOCK_ALL, publicReadAccess: false, encryption: cdk.aws_s3.BucketEncryption.S3_MANAGED, enforceSSL: true, removalPolicy: cdk.RemovalPolicy.DESTROY, autoDeleteObjects: true, versioned: false, objectOwnership: cdk.aws_s3.ObjectOwnership.BUCKET_OWNER_PREFERRED, }); // Create role for firehose stream const firehoseRole = new cdk.aws_iam.Role(this, "FirehoseRole", { roleName: `${props.serviceName}-firehose-role`, assumedBy: new cdk.aws_iam.ServicePrincipal("firehose.amazonaws.com"), inlinePolicies: { ["FirehoseRoleAdditionalPolicy"]: new cdk.aws_iam.PolicyDocument({ statements: [ new cdk.aws_iam.PolicyStatement({ effect: cdk.aws_iam.Effect.ALLOW, actions: [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject", ], resources: [dstBucket.bucketArn, `${dstBucket.bucketArn}/*`], }), new cdk.aws_iam.PolicyStatement({ effect: cdk.aws_iam.Effect.ALLOW, actions: ["logs:PutLogEvents"], resources: [`arn:aws:logs:${stack.region}:${stack.account}:log-group:/aws/kinesisfirehose/*`], }), ], }), }, }); // Create log group and stream for firehose error logs const firehoseFailLogGroup = new cdk.aws_logs.LogGroup(this, "FirehoseFailLogGroup", { logGroupName: `/aws/kinesisfirehose/${props.serviceName}-firehose/fail`, removalPolicy: cdk.RemovalPolicy.DESTROY, retention: cdk.aws_logs.RetentionDays.THREE_DAYS, }); const firehoseFailLogStream = new cdk.aws_logs.LogStream(this, "FirehoseFailLogStream", { logGroup: firehoseFailLogGroup, logStreamName: "logs", }); // Create firehose delivery stream this.deliveryStream = new cdk.aws_kinesisfirehose.CfnDeliveryStream(this, "Firehose", { deliveryStreamName: `${props.serviceName}-firehose`, deliveryStreamType: "DirectPut", s3DestinationConfiguration: { bucketArn: dstBucket.bucketArn, roleArn: firehoseRole.roleArn, cloudWatchLoggingOptions: { enabled: true, logGroupName: firehoseFailLogGroup.logGroupName, logStreamName: "logs", }, compressionFormat: "GZIP", errorOutputPrefix: "/errors", bufferingHints: { sizeInMBs: 5, intervalInSeconds: 300, }, }, }); this.deliveryStream._addResourceDependency(dstBucket.node.defaultChild as cdk.CfnResource); this.deliveryStream._addResourceDependency(firehoseFailLogGroup.node.defaultChild as cdk.CfnResource); this.deliveryStream._addResourceDependency(firehoseFailLogStream.node.defaultChild as cdk.CfnResource); } }
Handler
Lambda のリソース設定です。Go なので、ランタイムは Amazon Linux 2 を選択し、ローカルファイルから Docker でビルドします。このあたりは以前の記事を参考にしてください。
import * as cdk from "aws-cdk-lib"; import { Construct } from "constructs"; export interface HandlerProps { serviceName: string; albLogBucket: cdk.aws_s3.Bucket; deliveryStream: cdk.aws_kinesisfirehose.CfnDeliveryStream; } export class Handler extends Construct { constructor(scope: Construct, id: string, props: HandlerProps) { super(scope, id); const stack = cdk.Stack.of(this); // Create role for lambda function const logParserRole = new cdk.aws_iam.Role(this, "LogParserRole", { roleName: `${props.serviceName}-logparser-role`, assumedBy: new cdk.aws_iam.ServicePrincipal("lambda.amazonaws.com"), inlinePolicies: { ["HandlerLogParserRoleAdditionalPolicy"]: new cdk.aws_iam.PolicyDocument({ statements: [ new cdk.aws_iam.PolicyStatement({ effect: cdk.aws_iam.Effect.ALLOW, actions: ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"], resources: ["arn:aws:logs:*:*:*"], }), new cdk.aws_iam.PolicyStatement({ effect: cdk.aws_iam.Effect.ALLOW, actions: ["firehose:DescribeStream", "firehose:GetRecords", "firehose:PutRecordBatch"], resources: [`arn:aws:firehose:${stack.region}:${stack.account}:deliverystream/*`], }), new cdk.aws_iam.PolicyStatement({ effect: cdk.aws_iam.Effect.ALLOW, actions: ["s3:GetObject", "s3:GetBucketLocation", "s3:ListBucket"], resources: [`arn:aws:s3:::*`], }), ], }), }, }); // Create DLQ const logParserDLQ = new cdk.aws_sqs.Queue(this, "LogParserQueue", { queueName: `${props.serviceName}-logparser-dlq`, encryption: cdk.aws_sqs.QueueEncryption.SQS_MANAGED, enforceSSL: true, removalPolicy: cdk.RemovalPolicy.DESTROY, retentionPeriod: cdk.Duration.days(7), }); // Create lambda function const logParser = new cdk.aws_lambda.Function(this, "LogParser", { functionName: `${props.serviceName}-logparser`, description: "Parsing ALB logs and put to firehose", code: cdk.aws_lambda.Code.fromAsset("src/lambda/logparser", { bundling: { image: cdk.DockerImage.fromRegistry("golang:1.22.0"), command: [ "bash", "-c", [ "export GOCACHE=/tmp/go-cache", "export GOPATH=/tmp/go-path", "CGO_ENABLED=0 GOOS=linux go build -tags lambda.norpc -o /asset-output/bootstrap main.go", ].join(" && "), ], }, }), handler: "bootstrap", architecture: cdk.aws_lambda.Architecture.ARM_64, runtime: cdk.aws_lambda.Runtime.PROVIDED_AL2, role: logParserRole, logRetention: cdk.aws_logs.RetentionDays.THREE_DAYS, currentVersionOptions: { removalPolicy: cdk.RemovalPolicy.RETAIN, }, deadLetterQueueEnabled: true, deadLetterQueue: logParserDLQ, timeout: cdk.Duration.minutes(5), reservedConcurrentExecutions: 1, retryAttempts: 2, environment: { FIREHOSE_STREAM_NAME: props.deliveryStream.deliveryStreamName!, }, }); // Update function alias const alias = new cdk.aws_lambda.Alias(scope, "LogParserAlias", { aliasName: "live", version: logParser.currentVersion, }); // Set function alias to S3 event props.albLogBucket.addEventNotification( cdk.aws_s3.EventType.OBJECT_CREATED_PUT, new cdk.aws_s3_notifications.LambdaDestination(alias) ); } }
環境変数で Firehose のストリーム名を渡しています。
environment: { FIREHOSE_STREAM_NAME: props.deliveryStream.deliveryStreamName!, },
あとあとのことを考えて、エイリアスを使うようにしておいたほうがよいでしょう。
// Update function alias const alias = new cdk.aws_lambda.Alias(scope, "LogParserAlias", { aliasName: "live", version: logParser.currentVersion, });
最後に、props で渡したログバケットに対して S3 イベントを設定します。
// Set function alias to S3 event props.albLogBucket.addEventNotification( cdk.aws_s3.EventType.OBJECT_CREATED_PUT, new cdk.aws_s3_notifications.LambdaDestination(alias) );
ここまでで定義したリソースをデプロイし、環境が構築されたことを確認します。
cdk synth cdk deploy
ログ解析関数
この記事のメインである Lambda 関数です。
package main import ( "bytes" "compress/gzip" "context" "encoding/json" "fmt" "log" "os" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/firehose" "github.com/aws/aws-sdk-go-v2/service/firehose/types" "github.com/aws/aws-sdk-go-v2/service/s3" parser "github.com/nekrassov01/access-log-parser" ) var cfg aws.Config func init() { var err error cfg, err = config.LoadDefaultConfig(context.Background()) if err != nil { log.Fatalf("cannot load aws sdk config: %v", err) } } func handleRequest(ctx context.Context, event events.S3Event) error { buf := &bytes.Buffer{} p := parser.NewALBRegexParser(ctx, buf, parser.Option{}) s3client := s3.NewFromConfig(cfg) firehoseClient := firehose.NewFromConfig(cfg) for _, record := range event.Records { obj, err := s3client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(record.S3.Bucket.Name), Key: aws.String(record.S3.Object.Key), }) if err != nil { return err } r, err := gzip.NewReader(obj.Body) if err != nil { return err } defer r.Close() result, err := p.Parse(r) if err != nil { return err } b, err := json.Marshal(result) if err != nil { return err } fmt.Println(string(b)) } if buf.Len() == 0 { return fmt.Errorf("abort process because buffer is empty") } resp, err := firehoseClient.PutRecordBatch(ctx, &firehose.PutRecordBatchInput{ DeliveryStreamName: aws.String(os.Getenv("FIREHOSE_STREAM_NAME")), Records: []types.Record{ { Data: buf.Bytes(), }, }, }) if err != nil { return err } if resp != nil { b, err := json.Marshal(resp) if err != nil { return err } fmt.Println(string(b)) } return nil } func main() { lambda.Start(handleRequest) }
基本はこの程度の記述で OK です。まず init()
でグローバルに aws.Config
をセットアップします。init()
を使うことで、 Lambda 実行環境内でコールドスタート時に設定された aws.Config
が複数の呼び出しで再利用され、効率が上がります。ウォームスタート時は init()
は呼ばれません。
var cfg aws.Config func init() { var err error cfg, err = config.LoadDefaultConfig(context.Background()) if err != nil { log.Fatalf("cannot load aws sdk config: %v", err) } }
handleRequest
関数ではまずログパーサーを初期化します。
buf := &bytes.Buffer{} p := parser.NewALBRegexParser(ctx, buf, parser.Option{})
これだけで、ALB のアクセスログを解析するための正規表現がプリセットされた parser.RegexParser
が初期化されます。内部では以下のようになっています。
func NewALBRegexParser(ctx context.Context, writer io.Writer, opt Option) *RegexParser { p := &RegexParser{ ctx: ctx, writer: writer, decoder: regexLineDecoder, opt: opt, patterns: []*regexp.Regexp{ regexp.MustCompile(`^(?P<type>[!-~]+) (?P<time>[!-~]+) (?P<elb>[!-~]+) (?P<client_port>[!-~]+) (?P<target_port>[!-~]+) (?P<request_processing_time>[\d\-.]+) (?P<target_processing_time>[\d\-.]+) (?P<response_processing_time>[\d\-.]+) (?P<elb_status_code>\d{1,3}|-) (?P<target_status_code>\d{1,3}|-) (?P<received_bytes>[\d\-.]+) (?P<sent_bytes>[\d\-.]+) \"(?P<method>[A-Z\-]+) (?P<request_uri>[^ \"]+) (?P<protocol>HTTP/[0-9.]+|-|-)\" "(?P<user_agent>[^\"]*)" (?P<ssl_cipher>[!-~]+) (?P<ssl_protocol>[!-~]+) (?P<target_group_arn>[!-~]+) "(?P<trace_id>[ -~]+)" "(?P<domain_name>[ -~]+)" "(?P<chosen_cert_arn>[ -~]+)" (?P<matched_rule_priority>[!-~]+) (?P<request_creation_time>[!-~]+) "(?P<actions_executed>[ -~]+)" "(?P<redirect_url>[ -~]+)" "(?P<error_reason>[ -~]+)" "(?P<target_port_list>[ -~]+)" "(?P<target_status_code_list>[ -~]+)" "(?P<classification>[ -~]+)" "(?P<classification_reason>[ -~]+)"`), }, } if opt.LineHandler == nil { p.opt.LineHandler = JSONLineHandler } return p }
引数は以下の通りです。
引数 | 説明 |
---|---|
ctx context.Context | 主に os.Interrupt シグナルを伝播させるためのコンテキストtail -f で解析中、CTRL+C で中断しても整合性のある parser.Result を返すことができる |
writer io.Writer | 出力先。今回は解凍および加工したログをバッファに出力する |
opt parser.Option | さまざまなオプション設定を構造体で渡す |
オプションは以下のような構造体です。今回は使っていませんが、多機能なので紹介します。
type Option struct { Labels []string Filters []string SkipLines []int Prefix bool UnmatchLines bool LineNumber bool LineHandler LineHandler }
フィールド | 説明 |
---|---|
Labels | ラベル名で列を絞り込む |
Filters | size < 100 method == GET remote_host =~ ^192.168. のようなフィルタ式で行を絞り込む |
SkipLines | 行番号を渡すことでその行を除外する。ヘッダー除外などで有用 |
Prefix | ログに [ PROCESSED | UNMATCHED ] のような接頭辞を付与 |
UnmatchLines | マッチしなかったログを未加工のまま出すようにする。Prefix と併用することで視覚的にわかりやすくできる |
LineNumber | 変換後のログの先頭に行番号を付与 |
LineHandler | 行の変換処理をカスタム関数で渡せる。デフォルトは NDJSON。LTSV などいろいろ用意している |
本筋に戻ります。S3 と Firehose の API を使うので、クライアントを初期化します。
s3client := s3.NewFromConfig(cfg) firehoseClient := firehose.NewFromConfig(cfg)
S3 イベントが配列なので、レコードをイテレーションする中で以下の流れで処理します。
まず S3 の GetObject で対象のオブジェクトを取得します。
obj, err := s3client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(record.S3.Bucket.Name), Key: aws.String(record.S3.Object.Key), }) if err != nil { return err }
取得した Body は GZIP 圧縮されているので、gzip.NewReader
で解凍します。
r, err := gzip.NewReader(obj.Body) if err != nil { return err } defer r.Close()
parser.RegexParser
には io.Reader
を引数にとる Parse
メソッドがあるので解凍した Body をそのまま渡せます。parser.NewALBRegexParser
の第 2 引数で設定した buf
に変換後のログが放流されます。
result, err := p.Parse(r) if err != nil { return err }
result
には集計結果や解析できなかったログが入っているので、JSON に変換して標準出力に出すことで CloudWatch Logs に拾わせます。
b, err := json.Marshal(result) if err != nil { return err } fmt.Println(string(b))
余談ですが result は parser.Result
型で、以下のような構造体です。
type Result struct { Total int `json:"total"` Matched int `json:"matched"` Unmatched int `json:"unmatched"` Excluded int `json:"excluded"` Skipped int `json:"skipped"` ElapsedTime time.Duration `json:"elapsedTime"` Source string `json:"source"` ZipEntries []string `json:"zipEntries,omitempty"` Errors []Errors `json:"errors"` inputType inputType `json:"-"` } type Errors struct { Entry string `json:"entry,omitempty"` LineNumber int `json:"lineNumber"` Line string `json:"line"` }
これは fmt.Stringer
を実装しており、fmt.Println()
に渡すと String()
で定義された文字列表現が表示されます。以下のような形です。
/* SUMMARY */ +-------+---------+-----------+----------+---------+-------------+--------------------------------+ | Total | Matched | Unmatched | Excluded | Skipped | ElapsedTime | Source | +-------+---------+-----------+----------+---------+-------------+--------------------------------+ | 5 | 4 | 1 | 0 | 0 | 1.16375ms | sample_s3_contains_unmatch.log | +-------+---------+-----------+----------+---------+-------------+--------------------------------+ Total : Total number of log line processed Matched : Number of log line that successfully matched pattern Unmatched : Number of log line that did not match any pattern Excluded : Number of log line that did not extract by filter expressions Skipped : Number of log line that skipped by line number /* UNMATCH LINES */ +------------+------------------------------------------------------------------------------------------------------+ | LineNumber | Line | +------------+------------------------------------------------------------------------------------------------------+ | 4 | d45e67fa89b012c3a45678901b234c56d78a90f12b3456789a012345c6789d01 awsrandombucket89 [03/Feb/2019:03:5 | | | 4:33 +0000] 192.0.2.76 d45e67fa89b012c3a45678901b234c56d78a90f12b3456789a012345c6789d01 7B4A0FABBEXA | | | MPLE REST.GET.VERSIONING - "GET /awsrandombucket89?versioning HTTP/1.1" 200 - 113 - 33 - "-" "S3Cons | | | ole/0.4" | +------------+------------------------------------------------------------------------------------------------------+ LineNumber : Line number of the log that did not match any pattern Line : Raw log line that did not match any pattern
手元で使うには便利ですが再利用性はないので、JSON にシリアライズしてから logs に送り込む選択をしました。
Firehose に PUT する前に、バッファが空かどうか確認します。ログ行がすべてアンマッチであればここでひっかかります。
if buf.Len() == 0 { return fmt.Errorf("abort process because buffer is empty") }
最後に、変換したログを Firehose に PUT します。ストリーム名は環境変数から読み込み、レコードには buf
をバイト列に変換して渡します。レスポンスは JSON に変換して CloudWatch logs に出力します。
resp, err := firehoseClient.PutRecordBatch(ctx, &firehose.PutRecordBatchInput{ DeliveryStreamName: aws.String(os.Getenv("FIREHOSE_STREAM_NAME")), Records: []types.Record{ { Data: buf.Bytes(), }, }, }) if err != nil { return err } if resp != nil { b, err := json.Marshal(resp) if err != nil { return err } fmt.Println(string(b)) } return nil
あとはこの関数を main 内で lambda に渡します。
func main() { lambda.Start(handleRequest) }
動作確認
EC2 のユーザーデータで httpd を仕込んであるので、適当にブラウザアクセスしてみます。CloudWatch logs を見ると以下のような感じで出力されていました。ログパーサーの解析結果と PutRecordBatch API の結果を別々に確認できるので、トラブルシューティングしやすいと思います。
ログパーサーの結果
{ "total": 4, "matched": 4, "unmatched": 0, "excluded": 0, "skipped": 0, "elapsedTime": 199721, "source": "", "errors": [] }
PutRecordBatch API の結果
{ "FailedPutCount": 0, "RequestResponses": [ { "ErrorCode": null, "ErrorMessage": null, "RecordId": "xxxx" } ], "Encrypted": false, "ResultMetadata": {} }
行き先バケットのログをみてみると、NDJSON 形式に変換されたログが問題なく配置されていました (マスクしています)
{"type":"http","time":"2024-03-14T01:12:44.329452Z","elb":"app/kawashima-test-alb/xxxxxxxxxxxxxxxx","client_port":"0.1.2.3:37749","target_port":"-","request_processing_time":"-1","target_processing_time":"-1","response_processing_time":"-1","elb_status_code":"301","target_status_code":"-","received_bytes":"37","sent_bytes":"331","method":"GET","request_uri":"http://9.8.7.6:80/","protocol":"HTTP/1.1","user_agent":"-","ssl_cipher":"-","ssl_protocol":"-","target_group_arn":"-","trace_id":"Root=Root=1-00000000-111111111122222222220000","domain_name":"-","chosen_cert_arn":"-","matched_rule_priority":"0","request_creation_time":"2024-03-14T01:12:44.329000Z","actions_executed":"redirect","redirect_url":"https://9.8.7.6:443/","error_reason":"-","target_port_list":"-","target_status_code_list":"-","classification":"-","classification_reason":"-"} {"type":"https","time":"2024-03-14T01:17:15.285828Z","elb":"app/kawashima-test-alb/xxxxxxxxxxxxxxxx","client_port":"2.3.4.5:43430","target_port":"10.0.2.249:80","request_processing_time":"0.001","target_processing_time":"0.001","response_processing_time":"0.000","elb_status_code":"200","target_status_code":"200","received_bytes":"247","sent_bytes":"294","method":"GET","request_uri":"https://9.8.7.6:443/","protocol":"HTTP/1.1","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0","ssl_cipher":"TLS_AES_128_GCM_SHA256","ssl_protocol":"TLSv1.3","target_group_arn":"arn:aws:elasticloadbalancing:ap-northeast-1:000000000000:targetgroup/kawashima-test-alb-tg/zzzzzzzzzzzzzzzz","trace_id":"Root=1-00000000-111111111122222222223333","domain_name":"-","chosen_cert_arn":"arn:aws:acm:ap-northeast-1:000000000000:certificate/12345678-1234-1234-1234-123456789012","matched_rule_priority":"0","request_creation_time":"2024-03-14T01:17:15.284000Z","actions_executed":"forward","redirect_url":"-","error_reason":"-","target_port_list":"10.0.2.249:80","target_status_code_list":"200","classification":"-","classification_reason":"-"} {"type":"https","time":"2024-03-14T01:17:35.099409Z","elb":"app/kawashima-test-alb/xxxxxxxxxxxxxxxx","client_port":"2.3.4.5:50752","target_port":"10.0.3.34:80","request_processing_time":"0.002","target_processing_time":"0.002","response_processing_time":"0.000","elb_status_code":"404","target_status_code":"404","received_bytes":"414","sent_bytes":"375","method":"GET","request_uri":"https://9.8.7.6:443/favicon.ico","protocol":"HTTP/1.1","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11","ssl_cipher":"TLS_AES_128_GCM_SHA256","ssl_protocol":"TLSv1.3","target_group_arn":"arn:aws:elasticloadbalancing:ap-northeast-1:000000000000:targetgroup/kawashima-test-alb-tg/zzzzzzzzzzzzzzzz","trace_id":"Root=1-00000000-111111111122222222224444","domain_name":"-","chosen_cert_arn":"arn:aws:acm:ap-northeast-1:000000000000:certificate/12345678-1234-1234-1234-123456789012","matched_rule_priority":"0","request_creation_time":"2024-03-14T01:17:35.095000Z","actions_executed":"forward","redirect_url":"-","error_reason":"-","target_port_list":"10.0.3.34:80","target_status_code_list":"404","classification":"-","classification_reason":"-"} {"type":"https","time":"2024-03-14T01:17:36.733786Z","elb":"app/kawashima-test-alb/xxxxxxxxxxxxxxxx","client_port":"2.3.4.5:50760","target_port":"10.0.2.249:80","request_processing_time":"0.001","target_processing_time":"0.000","response_processing_time":"0.000","elb_status_code":"404","target_status_code":"404","received_bytes":"413","sent_bytes":"375","method":"GET","request_uri":"https://9.8.7.6:443/robots.txt","protocol":"HTTP/1.1","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11","ssl_cipher":"TLS_AES_128_GCM_SHA256","ssl_protocol":"TLSv1.3","target_group_arn":"arn:aws:elasticloadbalancing:ap-northeast-1:000000000000:targetgroup/kawashima-test-alb-tg/zzzzzzzzzzzzzzzz","trace_id":"Root=1-00000000-111111111122222222225555","domain_name":"-","chosen_cert_arn":"arn:aws:acm:ap-northeast-1:000000000000:certificate/12345678-1234-1234-1234-123456789012","matched_rule_priority":"0","request_creation_time":"2024-03-14T01:17:36.732000Z","actions_executed":"forward","redirect_url":"-","error_reason":"-","target_port_list":"10.0.2.249:80","target_status_code_list":"404","classification":"-","classification_reason":"-"} {"type":"https","time":"2024-03-14T01:17:38.346061Z","elb":"app/kawashima-test-alb/xxxxxxxxxxxxxxxx","client_port":"2.3.4.5:50772","target_port":"10.0.3.34:80","request_processing_time":"0.000","target_processing_time":"0.002","response_processing_time":"0.000","elb_status_code":"404","target_status_code":"404","received_bytes":"414","sent_bytes":"375","method":"GET","request_uri":"https://9.8.7.6:443/sitemap.xml","protocol":"HTTP/1.1","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11","ssl_cipher":"TLS_AES_128_GCM_SHA256","ssl_protocol":"TLSv1.3","target_group_arn":"arn:aws:elasticloadbalancing:ap-northeast-1:000000000000:targetgroup/kawashima-test-alb-tg/zzzzzzzzzzzzzzzz","trace_id":"Root=1-00000000-111111111122222222226666","domain_name":"-","chosen_cert_arn":"arn:aws:acm:ap-northeast-1:000000000000:certificate/12345678-1234-1234-1234-123456789012","matched_rule_priority":"0","request_creation_time":"2024-03-14T01:17:38.343000Z","actions_executed":"forward","redirect_url":"-","error_reason":"-","target_port_list":"10.0.3.34:80","target_status_code_list":"404","classification":"-","classification_reason":"-"}
再利用性
このコードは、少し変更するだけで ALB の他にも NLB, CLB, CloudFront, S3 で使えます。それぞれ専用の正規表現がプリセットされたコンストラクタ (というか初期化関数) を用意しているので、parser.NewALBRegexParser
を以下のいずれかに変更するだけです。このことから再利用性が高いと言えます。
parser.NewNLBRegexPaser
parser.NewCLBRegexParser
parser.NewCFRegexParser
parser.NewS3RegexParser
注意点
今回は簡易構成なので、大規模環境ではそのまま使えません。以下に注意してください。
SQS の利用
S3 トリガーから直接 Lambda を呼び出していますが、大規模な環境では SQS を挟むなど、非同期アーキテクチャを意識した設計を検討してください。
PutRecordBatch API の制限
Amazon Data Firehose の PutRecordBatch API には以下のような制限があります。今回の Lambda 関数では、この制限を超過しないかを追跡してデータを分割するようなロジックは入れていないことに注意してください (ドキュメント)
The PutRecordBatch operation can take up to 500 records per call or 4 MiB per call, whichever is smaller. This quota cannot be changed.
(訳) PutRecordBatch オペレーションは、1 コールあたり 500 レコードまたは 4MiB のいずれか小さい方まで取ることができる。このクォータは変更できない。
PutRecordBatch API のレスポンス
今回は PutRecordBatch API の戻り値を logs に出力していますが、レスポンスには失敗情報も格納されるので、本番ワークロードではレスポンスをチェックしてリトライする処理も場合によっては必要になるかもしれません。FailedPutCount
で処理に失敗した可能性のあるレコードの数がわかるので、これが 1 以上だったら対象のレコードをリトライするような機能を実装すればよいと思います。
おわりに
簡易的な構成ではありますが、自作ログパーサーの使い方を紹介しながら、ログを集約する環境を簡単かつ効率的に構築できることを確認しました。この延長でもっと本格的な DWH も設計および構築してみたいです。
大規模なログ集約を考える場合、S3 バケットにログを PUT する際にリソース名をプレフィックスに使ったり、あるいはログの先頭にリソース名を付与したりといった、ログの識別方式も重要になってくると思います。今回使ったログパーサーでは変換ロジックを外部から注入できるので、このへんもクリアできそうではあります。引き続き手を動かしてみたいと思います。