はじめに

ログ解析についていろいろ書いてきました。並べてみると、ひとつのテーマを軸にだいぶ引っ張っています。

残すところログ基盤の構築ができればこのテーマにも終止符が打てそうです。

概要

以下のような構成を目指します。

構成

  • 各アクセスログを単一のバケットに集約し、プレフィックスにリソース名を設定して管理する
  • ログバケットへの PUT イベントを契機に SQS にイベントメッセージを溜める
  • イベントメッセージを Lambda 関数でポーリングし、以下のように処理する
    • ログ行にリソース種別、リソース名を追加
    • JSON 形式へ変換
    • PutRecordBatch API の制限を考慮してデータを分割しつつ Firehose に PUT
  • Firehose の動的パーティショニングでリソース種別ごとにプレフィックスを切った上で中間バケットに PUT
  • Glue ジョブで ETL 処理
  • Athena でクエリ
  • QuickSight で可視化

スコープ

本記事では構造化フォーマットに変換したログを中間バケットに集約するところまでをスコープとします。赤枠の部分です。


今回はこのように 3 レイヤーのデータ変換にしてみました。段階を踏むことでフローが明確になりますし、中間データが保持されることで必要に応じて様々な ETL 処理を適用できます。

バケット 説明
Log bucket 生ログを格納するバケット (今回)
Intermediate bucket リソース名やリソース種別を付与し、JSON 形式に変換したログを格納するバケット (今回)
Destination bucket Parquet 形式への変換や統合テーブルなど、より閲覧や分析に適した形に変換されたデータを格納するバケット (次回)

構築

リソースの構築は CDK で行います。リポジトリはこちらです。前回までの記事と重複する点は GitHub へのリンクのみ記載します。

ログ格納用バケット

まずは生ログの置き場所となる S3 バケットを作ります。

VPC

VPC を作ります。

EC2 Instance Connect Endpoint

踏み台用の EIC エンドポイントを作ります。

ログ送信元リソース

アクセスログの送信元となるリソースを作ります。CDK のコードサンプルとしても使えるかもしれません。

EC2 インスタンスでは UserData を利用して httpdtd-agent のセットアップをしています。S3 バケットのプレフィックスには変更される可能性のあるホスト名や IP アドレスではなく論理名がほしいので、instanceName (Name タグ) を設定するようにしました。UserData として設定される時点で変数を展開したいので td-agent.conf もインラインで書いています。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
    const prefix = "web";
    this.instanceName = `${props.serviceName}-${prefix}-instance`;
    const userData = cdk.aws_ec2.UserData.forLinux({ shebang: "#!/bin/bash" });
    userData.addCommands(
      "# setup httpd",
      "sudo yum update -y",
      "sudo yum install -y httpd",
      "sudo systemctl start httpd",
      "sudo systemctl enable httpd",
      "sudo touch /var/www/html/index.html",
      'echo "Hello from httpd" | sudo tee -a /var/www/html/index.html',
      "",
      "# setup td-agent",
      "curl -L https://toolbelt.treasuredata.com/sh/install-amazon2-td-agent3.sh | sh",
      'sudo sed -i -e "s|^User=td-agent|User=root|g" /usr/lib/systemd/system/td-agent.service',
      'sudo sed -i -e "s|^Group=td-agent|Group=root|g" /usr/lib/systemd/system/td-agent.service',
      "sudo cp /etc/td-agent/td-agent.conf /etc/td-agent/td-agent.conf.bk",
      "",
      `CONFIG=$(
  cat <<EOF
<source>
  @type tail
  path /var/log/httpd/access_log
  pos_file /var/log/td-agent/access_log.pos
  tag access_log
  <parse>
    @type none
  </parse>
</source>
 
<match access_log>
  @type s3
  s3_bucket ${props.logBucket.bucketName}
  s3_region ap-northeast-1
  path ${this.instanceName}/
  time_slice_format %Y/%m/%d/%H-%M-%S
  <format>
    @type single_value
  </format>
  <buffer>
    @type file
    path /var/log/td-agent/s3
    timekey 5m
    timekey_wait 5m
    chunk_limit_size 256m
  </buffer>
</match>
EOF
)`,
      "",
      'echo "$CONFIG" | sudo tee /etc/td-agent/td-agent.conf >/dev/null',
      "sudo systemctl restart td-agent.service"
    );

なお、CloudFront ディストリビューションも distributionName として論理名を決めて使っています。

1
this.distributionName = `${props.serviceName}-distribution`;

小ネタですが EC2 インスタンスの L2 コンストラクトは T 系インスタンスタイプの CPUCredits を直接変更できません。このため以下のようにエスケープハッチunlimited から standard に変更しています。

1
2
const cfnInstance = instance.node.defaultChild as cdk.aws_ec2.CfnInstance;
cfnInstance.addPropertyOverride("CreditSpecification.CPUCredits", "standard");

Stream

Firehose ストリームを作ります。今回は L1 コンストラクトで動的パーティショニングを設定しています。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
 
export interface StreamProps {
  serviceName: string;
}
 
export class Stream extends Construct {
  readonly deliveryStream: cdk.aws_kinesisfirehose.CfnDeliveryStream;
 
  constructor(scope: Construct, id: string, props: StreamProps) {
    super(scope, id);
    const stack = cdk.Stack.of(this);
 
    ...
 
    // Create firehose delivery stream
    this.deliveryStream = new cdk.aws_kinesisfirehose.CfnDeliveryStream(this, "Firehose", {
      deliveryStreamName: `${props.serviceName}-firehose`,
      deliveryStreamType: "DirectPut",
      extendedS3DestinationConfiguration: {
        bucketArn: dstBucket.bucketArn,
        roleArn: firehoseRole.roleArn,
        cloudWatchLoggingOptions: {
          enabled: true,
          logGroupName: firehoseLogGroup.logGroupName,
          logStreamName: firehoseLogStream.logStreamName,
        },
        compressionFormat: "GZIP",
        prefix: "!{partitionKeyFromQuery:resource_type}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/",
        errorOutputPrefix: "errors/!{firehose:error-output-type}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/",
        bufferingHints: {
          sizeInMBs: 128,
          intervalInSeconds: 300,
        },
        dynamicPartitioningConfiguration: {
          enabled: true,
        },
        processingConfiguration: {
          enabled: true,
          processors: [
            {
              type: "RecordDeAggregation",
              parameters: [
                {
                  parameterName: "SubRecordType",
                  parameterValue: "JSON",
                },
              ],
            },
            {
              type: "MetadataExtraction",
              parameters: [
                {
                  parameterName: "MetadataExtractionQuery",
                  parameterValue: "{resource_type: .resource_type}",
                },
                {
                  parameterName: "JsonParsingEngine",
                  parameterValue: "JQ-1.6",
                },
              ],
            },
            {
              type: "AppendDelimiterToRecord",
              parameters: [],
            },
          ],
        },
      },
    });
  }
}

以下が動的パーティションを含んだ Prefix の設定です。timestamp は組み込みのものをそのまま使い、先頭に resource_type を設定します。こうすることで、ログフォーマットごとにパーティションが分割されます。この値は処理時に実際のデータを読み込んで取得されます。

1
2
prefix: "!{partitionKeyFromQuery:resource_type}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/",
errorOutputPrefix: "errors/!{firehose:error-output-type}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/",

上記のように設定した場合、動的パーティショニングは明示的に有効化する必要があります。

1
2
3
dynamicPartitioningConfiguration: {
  enabled: true,
},

データを読み込んでパーティションを設定するので、詳細な設定が必要です。MetadataExtraction で JSON をパースするためのパラメータを設定します。また RecordDeAggregationSubRecordTypeJSON に設定します。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
processingConfiguration: {
  enabled: true,
  processors: [
    {
      type: "RecordDeAggregation",
      parameters: [
        {
          parameterName: "SubRecordType",
          parameterValue: "JSON",
        },
      ],
    },
    {
      type: "MetadataExtraction",
      parameters: [
        {
          parameterName: "MetadataExtractionQuery",
          parameterValue: "{resource_type: .resource_type}",
        },
        {
          parameterName: "JsonParsingEngine",
          parameterValue: "JQ-1.6",
        },
      ],
    },
    {
      type: "AppendDelimiterToRecord",
      parameters: [],
    },
  ],
},

Handler

Lambda 関数のリソース設定です。

重要な点として、環境変数でリソースのマッピング情報を渡しています。今回は対象が少ないので環境変数を使っていますが、リソース数が多い場合は SSM パラメーターストアなどを検討しましょう。props で渡された各リソースの名前をキーに、リソース種別を値にしています。

01
02
03
04
05
06
07
08
09
10
11
12
13
// Create resoure map for determine log format
const resourceMap: { [kye: string]: string } = {};
resourceMap[props.alb.loadBalancerName] = "alb";
resourceMap[props.nlb.loadBalancerName] = "nlb";
resourceMap[props.instanceName] = "clf";
resourceMap[props.distributionName] = "cf";
 
...
 
environment: {
  FIREHOSE_STREAM_NAME: props.deliveryStream.deliveryStreamName!,
  RESOURCE_MAP: JSON.stringify(resourceMap),
},

関数エイリアスを発行し、イベントソースに SQS キューを設定します。maxConcurrency は関数の reservedConcurrentExecutions と同じにします。

01
02
03
04
05
06
07
08
09
10
11
12
13
// Update function alias
const alias = new cdk.aws_lambda.Alias(this, "LogParserAlias", {
  aliasName: "live",
  version: logParser.currentVersion,
})
 
// Set queue to lambda event source
alias.addEventSource(
  new cdk.aws_lambda_event_sources.SqsEventSource(queue, {
    batchSize: 10,
    maxConcurrency: 2,
  })
);

ここまでで定義したリソースをデプロイします。

1
2
cdk synth
cdk deploy

Lambda 関数

今回 Lambda 関数では、サポートしているログフォーマットであればどれがきても処理できるようにする必要があります。まず全体像です。

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package main
 
import (
    "bytes"
    "compress/gzip"
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "strings"
 
    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/firehose"
    "github.com/aws/aws-sdk-go-v2/service/firehose/types"
    "github.com/aws/aws-sdk-go-v2/service/s3"
 
    parser "github.com/nekrassov01/access-log-parser"
)
 
const (
    maxRecs = 400             // limit: 500
    maxSize = 3 * 1024 * 1024 // limit: 4MiB
)
 
var cfg aws.Config
 
func init() {
    var err error
    cfg, err = config.LoadDefaultConfig(context.Background())
    if err != nil {
        log.Fatalf("cannot load aws sdk config: %v", err)
    }
}
 
func getResourceMap(name string) (map[string]string, error) {
    s := os.Getenv(name)
    if s == "" {
        return nil, fmt.Errorf("failed to get value from environment variable \"%s\"", name)
    }
    m := make(map[string]string)
    if err := json.Unmarshal([]byte(s), &m); err != nil {
        return nil, err
    }
    return m, nil
}
 
func getParser(ctx context.Context, buf *bytes.Buffer, k, v string) (parser.Parser, error) {
    lineHandler := func(labels, values []string, isFirst bool) (string, error) {
        ls := make([]string, 0, len(labels)+2)
        ls = append(ls, "resource_type", "resource_name")
        ls = append(ls, labels...)
        vs := make([]string, 0, len(values)+2)
        vs = append(vs, v, k)
        vs = append(vs, values...)
        return parser.JSONLineHandler(ls, vs, isFirst)
    }
    opt := parser.Option{
        LineHandler: lineHandler,
    }
    ec2opt := parser.Option{
        LineHandler: lineHandler,
        Filters:     []string{"user_agent !~* ^ELB-HealthChecker"},
    }
    cfopt := parser.Option{
        LineHandler: lineHandler,
        SkipLines:   []int{1, 2},
    }
    var p parser.Parser
    switch v {
    case "clf":
        p = parser.NewApacheCLFRegexParser(ctx, buf, ec2opt)
    case "clfv":
        p = parser.NewApacheCLFWithVHostRegexParser(ctx, buf, ec2opt)
    case "s3":
        p = parser.NewS3RegexParser(ctx, buf, opt)
    case "cf":
        p = parser.NewCFRegexParser(ctx, buf, cfopt)
    case "alb":
        p = parser.NewALBRegexParser(ctx, buf, opt)
    case "nlb":
        p = parser.NewNLBRegexParser(ctx, buf, opt)
    case "clb":
        p = parser.NewCLBRegexParser(ctx, buf, opt)
    default:
        return nil, fmt.Errorf("invalid resource type: \"%s\"", v)
    }
    return p, nil
}
 
func splitData(data []byte) [][]byte {
    var (
        chunks [][]byte
        chunk  []byte
        count  int
        size   int
    )
    for _, record := range bytes.Split(data, []byte("\n")) {
        if count >= maxRecs || size+len(record)+1 > maxSize {
            chunks = append(chunks, chunk)
            chunk = nil
            count = 0
            size = 0
        }
        chunk = append(chunk, record...)
        chunk = append(chunk, '\n')
        count++
        size += len(record) + 1
    }
    if len(chunk) > 0 {
        chunks = append(chunks, chunk)
    }
    return chunks
}
 
func handleRequest(ctx context.Context, event events.SQSEvent) error {
    rmap, err := getResourceMap("RESOURCE_MAP")
    if err != nil {
        return err
    }
    buf := &bytes.Buffer{}
    s3client := s3.NewFromConfig(cfg)
    firehoseClient := firehose.NewFromConfig(cfg)
    for _, record := range event.Records {
        var s3event *events.S3Event
        if err := json.Unmarshal([]byte(record.Body), &s3event); err != nil {
            return err
        }
        for _, rec := range s3event.Records {
            key := rec.S3.Object.Key
            name := strings.Split(key, "/")[0]
            obj, err := s3client.GetObject(ctx, &s3.GetObjectInput{
                Bucket: aws.String(rec.S3.Bucket.Name),
                Key:    aws.String(key),
            })
            if err != nil {
                return err
            }
            j, _ := json.Marshal(map[string]string{"key": key})
            fmt.Println(string(j))
            v, ok := rmap[name]
            if !ok {
                fmt.Printf("skip because cannot determine log format of \"%s\"\n", name)
                continue
            }
            p, err := getParser(ctx, buf, name, v)
            if err != nil {
                return err
            }
            r, err := gzip.NewReader(obj.Body)
            if err != nil {
                return err
            }
            defer r.Close()
            result, err := p.Parse(r)
            if err != nil {
                return err
            }
            b, err := json.Marshal(result)
            if err != nil {
                return err
            }
            fmt.Println(string(b))
        }
    }
    if buf.Len() == 0 {
        return fmt.Errorf("abort process because buffer is empty")
    }
    for _, data := range splitData(buf.Bytes()) {
        resp, err := firehoseClient.PutRecordBatch(ctx, &firehose.PutRecordBatchInput{
            DeliveryStreamName: aws.String(os.Getenv("FIREHOSE_STREAM_NAME")),
            Records: []types.Record{
                {
                    Data: data,
                },
            },
        })
        if err != nil {
            return err
        }
        if resp != nil {
            b, err := json.Marshal(resp)
            if err != nil {
                return err
            }
            fmt.Println(string(b))
        }
    }
    return nil
}
 
func main() {
    lambda.Start(handleRequest)
}

以下の関数で環境変数からリソースのマッピング情報を取得し、JSON 文字列を Go の map に変換します。環境変数が取得できなければエラーになります。

01
02
03
04
05
06
07
08
09
10
11
func getResourceMap(name string) (map[string]string, error) {
    s := os.Getenv(name)
    if s == "" {
        return nil, fmt.Errorf("failed to get value from environment variable \"%s\"", name)
    }
    m := make(map[string]string)
    if err := json.Unmarshal([]byte(s), &m); err != nil {
        return nil, err
    }
    return m, nil
}

getParser ではログパーサーをセットアップします。以下では行に対する変換処理を担う parser.JSONLineHandler をカスタマイズし、resource_typeresource_name を追加しています。

1
2
3
4
5
6
7
8
9
lineHandler := func(labels, values []string, isFirst bool) (string, error) {
    ls := make([]string, 0, len(labels)+2)
    ls = append(ls, "resource_type", "resource_name")
    ls = append(ls, labels...)
    vs := make([]string, 0, len(values)+2)
    vs = append(vs, v, k)
    vs = append(vs, values...)
    return parser.JSONLineHandler(ls, vs, isFirst)
}

EC2 インスタンスの場合はヘルスチェックのログがあった場合に除外する設定を入れています。

1
2
3
4
ec2opt := parser.Option{
    LineHandler: lineHandler,
    Filters:     []string{"user_agent !~* ^ELB-HealthChecker"},
}

CloudFront の場合はヘッダー2 行を除外する設定を入れています。

1
2
3
4
cfopt := parser.Option{
    LineHandler: lineHandler,
    SkipLines:   []int{1, 2},
}

あとは resource_type に応じた適切なログパーサーを設定します。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
var p parser.Parser
switch v {
case "clf":
    p = parser.NewApacheCLFRegexParser(ctx, buf, ec2opt)
case "clfv":
    p = parser.NewApacheCLFWithVHostRegexParser(ctx, buf, ec2opt)
case "s3":
    p = parser.NewS3RegexParser(ctx, buf, opt)
case "cf":
    p = parser.NewCFRegexParser(ctx, buf, cfopt)
case "alb":
    p = parser.NewALBRegexParser(ctx, buf, opt)
case "nlb":
    p = parser.NewNLBRegexParser(ctx, buf, opt)
case "clb":
    p = parser.NewCLBRegexParser(ctx, buf, opt)
default:
    return nil, fmt.Errorf("invalid resource type: \"%s\"", v)
}

PutRecordBatch API の制限を考慮したデータ分割もしています。以下の制限です。

The PutRecordBatch operation can take up to 500 records per call or 4 MiB per call, whichever is smaller. This quota cannot be changed.
(訳) PutRecordBatch オペレーションは、1 コールあたり 500 レコードまたは 4MiB のいずれか小さい方まで取ることができる。このクォータは変更できない。

まず、余裕を持った上限値を設定しておきます。

1
2
3
4
const (
    maxRecs = 400             // limit: 500
    maxSize = 3 * 1024 * 1024 // limit: 4MiB
)

splitData 関数ではレコード数と容量を検査してバイト列を分割します。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<br />func splitData(data []byte) [][]byte {
    var (
        chunks [][]byte
        chunk  []byte
        count  int
        size   int
    )
    for _, record := range bytes.Split(data, []byte("\n")) {
        if count >= maxRecs || size+len(record)+1 > maxSize {
            chunks = append(chunks, chunk)
            chunk = nil
            count = 0
            size = 0
        }
        chunk = append(chunk, record...)
        chunk = append(chunk, '\n')
        count++
        size += len(record) + 1
    }
    if len(chunk) > 0 {
        chunks = append(chunks, chunk)
    }
    return chunks
}

handleRequest 関数では splitData で分割した塊ごとに PutRecordBatch API をコールすることでエラーを回避します。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func handleRequest(ctx context.Context, event events.SQSEvent) error {
    ...
 
    for _, data := range splitData(buf.Bytes()) {
        resp, err := firehoseClient.PutRecordBatch(ctx, &firehose.PutRecordBatchInput{
            DeliveryStreamName: aws.String(os.Getenv("FIREHOSE_STREAM_NAME")),
            Records: []types.Record{
                {
                    Data: data,
                },
            },
        })
        if err != nil {
            return err
        }
        if resp != nil {
            b, err := json.Marshal(resp)
            if err != nil {
                return err
            }
            fmt.Println(string(b))
        }
    }
    return nil
}

動作確認

各リソースに対してブラウザアクセスし、ログが収集されるか確認します。

生ログ

プレフィックスが想定通りに設定されています。

変換後のログ

こちらも Firehose の動的パーティショニングによってログフォーマットごとに振り分けられています。あとは AWS Glue を使ってこのプレフィックスごとにテーブル化すれば、整理されたログの状態が実現できそうです。まだ検証できていませんが Glue の ETL 処理で統合テーブルをセットアップしてもよさそうですね。

resource_typeresource_name も問題なく付与されています (一部マスクしています)

1
2
3
4
5
{"resource_type":"clf","resource_name":"kawashima-test-web-instance","remote_host":"0.0.0.0","remote_logname":"-","remote_user":"-","datetime":"[24/Mar/2024:16:35:20 +0000]","method":"GET","request_uri":"/","protocol":"HTTP/1.1","status":"304","size":"-","referer":"-","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"}
{"resource_type":"clf","resource_name":"kawashima-test-web-instance","remote_host":"0.0.0.0","remote_logname":"-","remote_user":"-","datetime":"[24/Mar/2024:16:35:21 +0000]","method":"GET","request_uri":"/","protocol":"HTTP/1.1","status":"304","size":"-","referer":"-","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"}
{"resource_type":"clf","resource_name":"kawashima-test-web-instance","remote_host":"0.0.0.0","remote_logname":"-","remote_user":"-","datetime":"[24/Mar/2024:16:35:21 +0000]","method":"GET","request_uri":"/","protocol":"HTTP/1.1","status":"304","size":"-","referer":"-","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"}
{"resource_type":"clf","resource_name":"kawashima-test-web-instance","remote_host":"0.0.0.0","remote_logname":"-","remote_user":"-","datetime":"[24/Mar/2024:16:35:22 +0000]","method":"GET","request_uri":"/","protocol":"HTTP/1.1","status":"304","size":"-","referer":"-","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"}
{"resource_type":"clf","resource_name":"kawashima-test-web-instance","remote_host":"0.0.0.0","remote_logname":"-","remote_user":"-","datetime":"[24/Mar/2024:16:35:22 +0000]","method":"GET","request_uri":"/","protocol":"HTTP/1.1","status":"304","size":"-","referer":"-","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"}

おわりに

AWS でのログ基盤構築において、フォーマットの異なる様々なログの集約と構造化までをやってみました。次回は AWS Glue を使って ETL 処理したり Amazon Athena を使ってクエリしたりといった、よりデータ分析っぽい内容を検証してみたいと思います。

私自身データ分析系の AWS サービスは今まで使ってこなかったので、構成に関してはまだ最適化の余地があると思います。が、現在のところ多様なログを単一のフローに沿って処理できているので、比較的シンプルな構成になっているのではないでしょうか。