Amazon CloudWatch EventsのイベントルールでターゲットをAmazon SQSにしてAWS Lambdaでイベントソースにして処理するAWS CloudFormationのテンプレートをつくってみた
AWSマネジメントコンソールからだと簡単に設定できましたが、AWS CloudFormationのテンプレート化するのにいろいろとハマったのでメモ。
リソース
必要最低限となる構成はこんな感じになりました。
利用するサービスは以下になります。
- Amazon S3
- AWS CloudTrail
- Amazon CloudWatch Events
- Amazon SQS
- AWS Lambda
- AWS CloudFormation(リソース管理用)
ポイント
先にポイントをいくつかあげてみます。
完成形のテンプレートはこのあとにおいてます。
Amazon S3のバケットを複数用意する
Amazon CloudWatch EventsでAmazon S3のイベントを扱うのにAWS CloudTrailも必要になります。
Amazon S3 ソースの CloudWatch イベント ルールを作成する (コンソール) – CodePipeline
https://docs.aws.amazon.com/ja_jp/codepipeline/latest/userguide/create-cloudtrail-S3-source-console.html
AWS CloudTrail 証跡を作成にはログファイルを出力するAmazon S3のバケットがいりますが、これをイベントを扱いたいバケットにしてしまうと。。。あとはわかりますね。無限ループに陥ります。
また、AWS CloudTrailでログを出力しない設定にすると、イベントが発火しませんでした。
AWS CloudTrailのログファイルを保存するキー名は固定
ログファイルを保存する先はバケット名/AWSLogs/AWSアカウントID/*
と指定する必要があります。
テンプレート抜粋
CloudTrailBucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref OutputBucket PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: cloudtrail.amazonaws.com Action: s3:GetBucketAcl Resource: !GetAtt OutputBucket.Arn - Effect: Allow Principal: Service: cloudtrail.amazonaws.com Action: s3:PutObject Resource: !Join - "" - - !GetAtt OutputBucket.Arn - "/AWSLogs/" - !Ref "AWS::AccountId" - "/*" Condition: StringEquals: s3:x-amz-acl: bucket-owner-full-control
AWSLogs
を別名にしてみたらAWS CloudFormationのスタック作成でエラーになりました。
Incorrect S3 bucket policy is detected for bucket: <ProjectName>-output (Service: AWSCloudTrail; Status Code: 400; Error Code: InsufficientS3BucketPolicyException; Request ID: 3a9a7575-5226-4f19-b62a-737e87acc9b8)
AWSアカウントID
を指定せずにバケット名/AWSLogs/*
とするとイベントが発火しませんでした。
AWS Lambda関数でメッセージの削除はしなくて良い
AWS Lambda関数でAmazon SQSを自前でポーリングして処理する場合、正常に処理が完了したらメッセージを削除する必要があったのですが、イベントソースにするとそれも必要なくなるみたいです。
AWS LambdaがSQSをイベントソースとしてサポートしました! | Developers.IO
https://dev.classmethod.jp/articles/aws-lambda-support-sqs-event-source/
次に関数コードを入力します。サンプルの関数はチュートリアル通り以下の内容として保存します。ここでSQSメッセージの削除処理を入れていないことが分かります。
最初はチュートリアルのサンプルだからかな?と思ってましたが実際に動作させると正常終了時にメッセージが勝手に削除されました。こりゃ便利。
なので実装はキューの情報からS3バケットに保存されたオブジェクトのキーを取得して出力しているだけです。
テンプレート抜粋
ReceiveQueFunction: Type: AWS::Lambda::Function Properties: FunctionName: !Sub "${ProjectName}-ReceiveQueFunction" Handler: "index.lambda_handler" Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | from __future__ import print_function import json import os import boto3 def lambda_handler(event, context): for record in event["Records"]: requestParameters = json.loads(record["body"])["detail"]["requestParameters"] print(str(requestParameters)) Runtime: "python3.7" Timeout: "60" ReservedConcurrentExecutions: 3
AWS Lambda関数の同時実行数を調整する
上記テンプレートでReservedConcurrentExecutions: 3
と同時実行数を指定していますが、こちらはケース・バイ・ケースで指定する必要があります。
AWS::Lambda::Function – AWS CloudFormation
https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-function.html#cfn-lambda-function-reservedconcurrentexecutions
大量のキューをさばく必要がある場合、同時実行数の制限まで全力でポーリングしてくれます。
なので同時実行数を指定していないと、標準設定の1,000
まで同時実行してくれます。アカウントの同時実行数は最大1,000
となりますので、もし他にも関数がある場合、影響する可能性があるのでご注意ください。
イベントルールのターゲット指定でバケット名やキーがプレフィックス指定できる
こちらは下記記事をご参考ください。地味に便利です。
Amazon CloudWatch EventsのルールでAmazon S3のキーをプレフィックス指定できた – Qiita
https://cloudpack.media/52397
今回はキーをprefix: hoge/
とすることでs3://バケット名/hoge/
配下にオブジェクトがPUTされた場合にイベントが発火する設定にしました。
テンプレート抜粋
CloudWatchEventRule: Type: AWS::Events::Rule Properties: Name: !Sub "${ProjectName}-EventRule" EventPattern: source: - aws.s3 detail-type: - "AWS API Call via CloudTrail" detail: eventSource: - s3.amazonaws.com eventName: - CopyObject - PutObject - CompleteMultipartUpload requestParameters: bucketName: - !Ref InputBucket key: - prefix: hoge/ Targets: - Arn: !GetAtt S3EventQueue.Arn Id: !Sub "${ProjectName}-TarfgetQueue"
SQSのキューポリシーを設定する
今回、一番ドハマリしました。
キューポリシーがなくてもリソースは作成できるのですが、それだとバケットにオブジェクトをPUTしてもイベントが発火しませんでした。AWSのドキュメントを漁ってみてもそれらしき記述が見当たらずでしたが、下記のフォーラムに情報があり知ることができました。
AWS Developer Forums: CloudWatch event rule not sending to …
https://forums.aws.amazon.com/message.jspa?messageID=742808
AWSマネジメントコンソールでぽちぽちと設定する場合にはポリシーを勝手に作成してくれるので、テンプレート化する際にハマりやすいポイントだったみたいです。
テンプレート抜粋
SQSQueuePolicy: Type: AWS::SQS::QueuePolicy Properties: PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: AWS: "*" Action: - "sqs:SendMessage" Resource: - !GetAtt S3EventQueue.Arn Condition: ArnEquals: "aws:SourceArn": !GetAtt CloudWatchEventRule.Arn Queues: - Ref: S3EventQueue
テンプレート
ちょっと長いですがテンプレートになります。
AWSTemplateFormatVersion: "2010-09-09" Parameters: ProjectName: Type: String Default: "<お好みで>" Resources: InputBucket: Type: AWS::S3::Bucket Properties: BucketName: !Sub "${ProjectName}-input" AccessControl: Private PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True OutputBucket: Type: AWS::S3::Bucket Properties: BucketName: !Sub "${ProjectName}-output" AccessControl: Private PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True S3EventQueue: Type: AWS::SQS::Queue Properties: DelaySeconds: 0 VisibilityTimeout: 360 LambdaExecutionRole: Type: AWS::IAM::Role Properties: RoleName: !Sub "${ProjectName}-LambdaRolePolicy" AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: "sts:AssumeRole" Path: "/" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Policies: - PolicyName: !Sub "${ProjectName}-LambdaRolePolices" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - s3:* Resource: "*" - PolicyName: !Sub "${ProjectName}-LambdaRoleSQSPolices" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - sqs:ReceiveMessage - sqs:DeleteMessage - sqs:GetQueueAttributes - sqs:ChangeMessageVisibility Resource: !GetAtt S3EventQueue.Arn ReceiveQueFunction: Type: AWS::Lambda::Function Properties: FunctionName: !Sub "${ProjectName}-ReceiveQueFunction" Handler: "index.lambda_handler" Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | from __future__ import print_function import json import os import boto3 def lambda_handler(event, context): for record in event["Records"]: requestParameters = json.loads(record["body"])["detail"]["requestParameters"] print(str(requestParameters)) Runtime: "python3.7" Timeout: "60" ReservedConcurrentExecutions: 3 CloudTrailBucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref OutputBucket PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: cloudtrail.amazonaws.com Action: s3:GetBucketAcl Resource: !GetAtt OutputBucket.Arn - Effect: Allow Principal: Service: cloudtrail.amazonaws.com Action: s3:PutObject Resource: !Join - "" - - !GetAtt OutputBucket.Arn - "/AWSLogs/" - !Ref "AWS::AccountId" - "/*" Condition: StringEquals: s3:x-amz-acl: bucket-owner-full-control CloudTrail: Type: AWS::CloudTrail::Trail DependsOn: - CloudTrailBucketPolicy Properties: TrailName: !Sub "${ProjectName}-Trail" S3BucketName: !Ref OutputBucket EventSelectors: - DataResources: - Type: AWS::S3::Object Values: - Fn::Sub: - "${InputBucketArn}/" - InputBucketArn: !GetAtt InputBucket.Arn ReadWriteType: WriteOnly IncludeManagementEvents: false IncludeGlobalServiceEvents: true IsLogging: true IsMultiRegionTrail: false CloudWatchEventRule: Type: AWS::Events::Rule Properties: Name: !Sub "${ProjectName}-EventRule" EventPattern: source: - aws.s3 detail-type: - "AWS API Call via CloudTrail" detail: eventSource: - s3.amazonaws.com eventName: - CopyObject - PutObject - CompleteMultipartUpload requestParameters: bucketName: - !Ref InputBucket key: - prefix: hoge/ Targets: - Arn: !GetAtt S3EventQueue.Arn Id: !Sub "${ProjectName}-TarfgetQueue" SQSQueuePolicy: Type: AWS::SQS::QueuePolicy Properties: PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: AWS: "*" Action: - "sqs:SendMessage" Resource: - !GetAtt S3EventQueue.Arn Condition: ArnEquals: "aws:SourceArn": !GetAtt CloudWatchEventRule.Arn Queues: - Ref: S3EventQueue LambdaFunctionEventSourceMapping: Type: AWS::Lambda::EventSourceMapping DependsOn: - S3EventQueue - ReceiveQueFunction Properties: BatchSize: 10 Enabled: true EventSourceArn: !GetAtt S3EventQueue.Arn FunctionName: !GetAtt ReceiveQueFunction.Arn
スタック作成して動かしてみる
最後にざくっと検証してみます。
# リソースを作成 > cd テンプレートファイルがある場所 > aws cloudformation create-stack \ --stack-name <お好みで> \ --template-body file://<テンプレートファイル名> \ --capabilities CAPABILITY_NAMED_IAM \ --region <お好みの> \ --parameters '[ { "ParameterKey": "ProjectName", "ParameterValue": "<お好みで>" } ]' { "StackId": "arn:aws:cloudformation:<お好みのリージョン>:xxxxxxxxxxxx:stack/<お好みのスタック名>/18686480-6f21-11ea-bcf3-020de04cec9a" } # ファイルをアップロード > touch hoge.txt # hogeキー配下にアップロードしない > aws s3 cp hoge.txt s3://<ProjectName>-input/ upload: ./hoge.txt to s3://<ProjectName>-input/hoge/hoge.txt > aws s3 cp hoge.txt s3://<ProjectName>-input/hoge/ upload: ./hoge.txt to s3://<ProjectName>-input/hoge/hoge.txt > aws s3 ls --recursive s3://<ProjectName>-input 2020-03-26 06:28:03 0 hoge.txt 2020-03-26 06:26:12 0 hoge/hoge.txt # Lambda関数のログを確認 > aws logs get-log-events \ --region <お好みのリージョン> \ --log-group-name '/aws/lambda/<ProjectName>-ReceiveQueFunction' \ --log-stream-name '2020/03/26/[$LATEST]ae8735ef9a1c46c38ab241f23a26b384' \ --query "events[].[message]" \ --output text START RequestId: 10f98cf9-c39b-531b-9514-da0f8ea72a42 Version: $LATEST {'bucketName': '<ProjectName>-input', 'Host': '<ProjectName>-input.s3.<お好みのリージョン>.amazonaws.com', 'key': 'hoge/hoge.txt'} END RequestId: 10f98cf9-c39b-531b-9514-da0f8ea72a42 REPORT RequestId: 10f98cf9-c39b-531b-9514-da0f8ea72a42 Duration: 1.85 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 69 MB Init Duration: 275.58 ms
やったぜ。
まとめ
AWSマネジメントコンソールで設定すると比較的かんたんに設定できるのですが、CFnのテンプレート化する際にはそこそこ大変でしたが、良い知見を得ることができました。
参考
Amazon S3 ソースの CloudWatch イベント ルールを作成する (コンソール) – CodePipeline
https://docs.aws.amazon.com/ja_jp/codepipeline/latest/userguide/create-cloudtrail-S3-source-console.html
AWS LambdaがSQSをイベントソースとしてサポートしました! | Developers.IO
https://dev.classmethod.jp/articles/aws-lambda-support-sqs-event-source/
AWS::Lambda::Function – AWS CloudFormation
https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-function.html#cfn-lambda-function-reservedconcurrentexecutions
Amazon CloudWatch EventsのルールでAmazon S3のキーをプレフィックス指定できた – Qiita
https://cloudpack.media/52397
AWS Developer Forums: CloudWatch event rule not sending to …
https://forums.aws.amazon.com/message.jspa?messageID=742808
Amazon CloudWatch LogsのログをAWS CLIでいい感じに取得する – Qiita
https://cloudpack.media/50416
AWS CLIを使ってAWS Lambdaのログ取得時に注意したいこと | Developers.IO
https://dev.classmethod.jp/articles/note-log-of-lambda-using-awscli/