Amazon CloudWatch EventsのイベントルールでターゲットをAmazon SQSにしてAWS Lambdaでイベントソースにして処理するAWS CloudFormationのテンプレートをつくってみた

AWSマネジメントコンソールからだと簡単に設定できましたが、AWS CloudFormationのテンプレート化するのにいろいろとハマったのでメモ。

リソース

必要最低限となる構成はこんな感じになりました。

利用するサービスは以下になります。

  • Amazon S3
  • AWS CloudTrail
  • Amazon CloudWatch Events
  • Amazon SQS
  • AWS Lambda
  • AWS CloudFormation(リソース管理用)

ポイント

先にポイントをいくつかあげてみます。
完成形のテンプレートはこのあとにおいてます。

Amazon S3のバケットを複数用意する

Amazon CloudWatch EventsでAmazon S3のイベントを扱うのにAWS CloudTrailも必要になります。

Amazon S3 ソースの CloudWatch イベント ルールを作成する (コンソール) – CodePipeline
https://docs.aws.amazon.com/ja_jp/codepipeline/latest/userguide/create-cloudtrail-S3-source-console.html

AWS CloudTrail 証跡を作成にはログファイルを出力するAmazon S3のバケットがいりますが、これをイベントを扱いたいバケットにしてしまうと。。。あとはわかりますね。無限ループに陥ります。

また、AWS CloudTrailでログを出力しない設定にすると、イベントが発火しませんでした。

AWS CloudTrailのログファイルを保存するキー名は固定

ログファイルを保存する先はバケット名/AWSLogs/AWSアカウントID/*と指定する必要があります。

テンプレート抜粋

  CloudTrailBucketPolicy:
    Type: AWS::S3::BucketPolicy
    Properties:
      Bucket: !Ref OutputBucket
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: cloudtrail.amazonaws.com
            Action: s3:GetBucketAcl
            Resource: !GetAtt OutputBucket.Arn
          - Effect: Allow
            Principal:
              Service: cloudtrail.amazonaws.com
            Action: s3:PutObject
            Resource: !Join
              - ""
              - - !GetAtt OutputBucket.Arn
                - "/AWSLogs/"
                - !Ref "AWS::AccountId"
                - "/*"
            Condition:
              StringEquals:
                s3:x-amz-acl: bucket-owner-full-control

AWSLogsを別名にしてみたらAWS CloudFormationのスタック作成でエラーになりました。

Incorrect S3 bucket policy is detected for bucket: <ProjectName>-output (Service: AWSCloudTrail; Status Code: 400; Error Code: InsufficientS3BucketPolicyException; Request ID: 3a9a7575-5226-4f19-b62a-737e87acc9b8)

AWSアカウントIDを指定せずにバケット名/AWSLogs/* とするとイベントが発火しませんでした。

AWS Lambda関数でメッセージの削除はしなくて良い

AWS Lambda関数でAmazon SQSを自前でポーリングして処理する場合、正常に処理が完了したらメッセージを削除する必要があったのですが、イベントソースにするとそれも必要なくなるみたいです。

AWS LambdaがSQSをイベントソースとしてサポートしました! | Developers.IO
https://dev.classmethod.jp/articles/aws-lambda-support-sqs-event-source/

次に関数コードを入力します。サンプルの関数はチュートリアル通り以下の内容として保存します。ここでSQSメッセージの削除処理を入れていないことが分かります。

最初はチュートリアルのサンプルだからかな?と思ってましたが実際に動作させると正常終了時にメッセージが勝手に削除されました。こりゃ便利。

なので実装はキューの情報からS3バケットに保存されたオブジェクトのキーを取得して出力しているだけです。

テンプレート抜粋

  ReceiveQueFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub "${ProjectName}-ReceiveQueFunction"
      Handler: "index.lambda_handler"
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          from __future__ import print_function
          import json
          import os
          import boto3
          def lambda_handler(event, context):
              for record in event["Records"]:
                  requestParameters = json.loads(record["body"])["detail"]["requestParameters"]
                  print(str(requestParameters))
      Runtime: "python3.7"
      Timeout: "60"
      ReservedConcurrentExecutions: 3

AWS Lambda関数の同時実行数を調整する

上記テンプレートでReservedConcurrentExecutions: 3と同時実行数を指定していますが、こちらはケース・バイ・ケースで指定する必要があります。

AWS::Lambda::Function – AWS CloudFormation
https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-function.html#cfn-lambda-function-reservedconcurrentexecutions

大量のキューをさばく必要がある場合、同時実行数の制限まで全力でポーリングしてくれます。
なので同時実行数を指定していないと、標準設定の1,000まで同時実行してくれます。アカウントの同時実行数は最大1,000となりますので、もし他にも関数がある場合、影響する可能性があるのでご注意ください。

イベントルールのターゲット指定でバケット名やキーがプレフィックス指定できる

こちらは下記記事をご参考ください。地味に便利です。

Amazon CloudWatch EventsのルールでAmazon S3のキーをプレフィックス指定できた – Qiita
https://cloudpack.media/52397

今回はキーをprefix: hoge/とすることでs3://バケット名/hoge/ 配下にオブジェクトがPUTされた場合にイベントが発火する設定にしました。

テンプレート抜粋

  CloudWatchEventRule:
    Type: AWS::Events::Rule
    Properties:
      Name: !Sub "${ProjectName}-EventRule"
      EventPattern:
        source:
          - aws.s3
        detail-type:
          - "AWS API Call via CloudTrail"
        detail:
          eventSource:
            - s3.amazonaws.com
          eventName:
            - CopyObject
            - PutObject
            - CompleteMultipartUpload
          requestParameters:
            bucketName:
              - !Ref InputBucket
            key:
              - prefix: hoge/
      Targets:
        - Arn: !GetAtt S3EventQueue.Arn
          Id: !Sub "${ProjectName}-TarfgetQueue"

SQSのキューポリシーを設定する

今回、一番ドハマリしました。
キューポリシーがなくてもリソースは作成できるのですが、それだとバケットにオブジェクトをPUTしてもイベントが発火しませんでした。AWSのドキュメントを漁ってみてもそれらしき記述が見当たらずでしたが、下記のフォーラムに情報があり知ることができました。

AWS Developer Forums: CloudWatch event rule not sending to …
https://forums.aws.amazon.com/message.jspa?messageID=742808

AWSマネジメントコンソールでぽちぽちと設定する場合にはポリシーを勝手に作成してくれるので、テンプレート化する際にハマりやすいポイントだったみたいです。

テンプレート抜粋

 SQSQueuePolicy:
    Type: AWS::SQS::QueuePolicy
    Properties:
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Principal:
              AWS: "*"
            Action:
              - "sqs:SendMessage"
            Resource:
              - !GetAtt S3EventQueue.Arn
            Condition:
              ArnEquals:
                "aws:SourceArn": !GetAtt CloudWatchEventRule.Arn
      Queues:
        - Ref: S3EventQueue

テンプレート

ちょっと長いですがテンプレートになります。

AWSTemplateFormatVersion: "2010-09-09"
Parameters:
  ProjectName:
    Type: String
    Default: "<お好みで>"

Resources:
  InputBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub "${ProjectName}-input"
      AccessControl: Private
      PublicAccessBlockConfiguration:
        BlockPublicAcls: True
        BlockPublicPolicy: True
        IgnorePublicAcls: True
        RestrictPublicBuckets: True

  OutputBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub "${ProjectName}-output"
      AccessControl: Private
      PublicAccessBlockConfiguration:
        BlockPublicAcls: True
        BlockPublicPolicy: True
        IgnorePublicAcls: True
        RestrictPublicBuckets: True

  S3EventQueue:
    Type: AWS::SQS::Queue
    Properties:
      DelaySeconds: 0
      VisibilityTimeout: 360

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub "${ProjectName}-LambdaRolePolicy"
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: "sts:AssumeRole"
      Path: "/"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: !Sub "${ProjectName}-LambdaRolePolices"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - s3:*
                Resource: "*"
        - PolicyName: !Sub "${ProjectName}-LambdaRoleSQSPolices"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
            - Effect: Allow
              Action:
              - sqs:ReceiveMessage
              - sqs:DeleteMessage
              - sqs:GetQueueAttributes
              - sqs:ChangeMessageVisibility
              Resource: !GetAtt S3EventQueue.Arn

  ReceiveQueFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub "${ProjectName}-ReceiveQueFunction"
      Handler: "index.lambda_handler"
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          from __future__ import print_function
          import json
          import os
          import boto3
          def lambda_handler(event, context):
              for record in event["Records"]:
                  requestParameters = json.loads(record["body"])["detail"]["requestParameters"]
                  print(str(requestParameters))
      Runtime: "python3.7"
      Timeout: "60"
      ReservedConcurrentExecutions: 3

  CloudTrailBucketPolicy:
    Type: AWS::S3::BucketPolicy
    Properties:
      Bucket: !Ref OutputBucket
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: cloudtrail.amazonaws.com
            Action: s3:GetBucketAcl
            Resource: !GetAtt OutputBucket.Arn
          - Effect: Allow
            Principal:
              Service: cloudtrail.amazonaws.com
            Action: s3:PutObject
            Resource: !Join
              - ""
              - - !GetAtt OutputBucket.Arn
                - "/AWSLogs/"
                - !Ref "AWS::AccountId"
                - "/*"
            Condition:
              StringEquals:
                s3:x-amz-acl: bucket-owner-full-control

  CloudTrail:
    Type: AWS::CloudTrail::Trail
    DependsOn:
      - CloudTrailBucketPolicy
    Properties:
      TrailName: !Sub "${ProjectName}-Trail"
      S3BucketName: !Ref OutputBucket
      EventSelectors:
        - DataResources:
            - Type: AWS::S3::Object
              Values:
                - Fn::Sub:
                  - "${InputBucketArn}/"
                  - InputBucketArn: !GetAtt InputBucket.Arn
          ReadWriteType: WriteOnly
          IncludeManagementEvents: false
      IncludeGlobalServiceEvents: true
      IsLogging: true
      IsMultiRegionTrail: false

  CloudWatchEventRule:
    Type: AWS::Events::Rule
    Properties:
      Name: !Sub "${ProjectName}-EventRule"
      EventPattern:
        source:
          - aws.s3
        detail-type:
          - "AWS API Call via CloudTrail"
        detail:
          eventSource:
            - s3.amazonaws.com
          eventName:
            - CopyObject
            - PutObject
            - CompleteMultipartUpload
          requestParameters:
            bucketName:
              - !Ref InputBucket
            key:
              - prefix: hoge/
      Targets:
        - Arn: !GetAtt S3EventQueue.Arn
          Id: !Sub "${ProjectName}-TarfgetQueue"

  SQSQueuePolicy:
    Type: AWS::SQS::QueuePolicy
    Properties:
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Principal:
              AWS: "*"
            Action:
              - "sqs:SendMessage"
            Resource:
              - !GetAtt S3EventQueue.Arn
            Condition:
              ArnEquals:
                "aws:SourceArn": !GetAtt CloudWatchEventRule.Arn
      Queues:
        - Ref: S3EventQueue

  LambdaFunctionEventSourceMapping:
    Type: AWS::Lambda::EventSourceMapping
    DependsOn:
      - S3EventQueue
      - ReceiveQueFunction
    Properties:
      BatchSize: 10
      Enabled: true
      EventSourceArn: !GetAtt S3EventQueue.Arn
      FunctionName: !GetAtt ReceiveQueFunction.Arn

スタック作成して動かしてみる

最後にざくっと検証してみます。

# リソースを作成
> cd テンプレートファイルがある場所

> aws cloudformation create-stack \
  --stack-name <お好みで> \
  --template-body file://<テンプレートファイル名> \
  --capabilities CAPABILITY_NAMED_IAM \
  --region <お好みの> \
  --parameters '[
    {
      "ParameterKey": "ProjectName",
      "ParameterValue": "<お好みで>"
    }
  ]'



{
    "StackId": "arn:aws:cloudformation:<お好みのリージョン>:xxxxxxxxxxxx:stack/<お好みのスタック名>/18686480-6f21-11ea-bcf3-020de04cec9a"
}

# ファイルをアップロード
> touch hoge.txt

# hogeキー配下にアップロードしない
> aws s3 cp hoge.txt s3://<ProjectName>-input/

upload: ./hoge.txt to s3://<ProjectName>-input/hoge/hoge.txt


> aws s3 cp hoge.txt s3://<ProjectName>-input/hoge/

upload: ./hoge.txt to s3://<ProjectName>-input/hoge/hoge.txt


> aws s3 ls --recursive s3://<ProjectName>-input

2020-03-26 06:28:03          0 hoge.txt
2020-03-26 06:26:12          0 hoge/hoge.txt


# Lambda関数のログを確認
> aws logs get-log-events \
  --region <お好みのリージョン> \
  --log-group-name '/aws/lambda/<ProjectName>-ReceiveQueFunction' \
  --log-stream-name '2020/03/26/[$LATEST]ae8735ef9a1c46c38ab241f23a26b384' \
  --query "events[].[message]" \
  --output text


START RequestId: 10f98cf9-c39b-531b-9514-da0f8ea72a42 Version: $LATEST

{'bucketName': '<ProjectName>-input', 'Host': '<ProjectName>-input.s3.<お好みのリージョン>.amazonaws.com', 'key': 'hoge/hoge.txt'}

END RequestId: 10f98cf9-c39b-531b-9514-da0f8ea72a42

REPORT RequestId: 10f98cf9-c39b-531b-9514-da0f8ea72a42  Duration: 1.85 ms       Billed Duration: 100 ms Memory Size: 128 MB
     Max Memory Used: 69 MB  Init Duration: 275.58 ms

やったぜ。

まとめ

AWSマネジメントコンソールで設定すると比較的かんたんに設定できるのですが、CFnのテンプレート化する際にはそこそこ大変でしたが、良い知見を得ることができました。

参考

Amazon S3 ソースの CloudWatch イベント ルールを作成する (コンソール) – CodePipeline
https://docs.aws.amazon.com/ja_jp/codepipeline/latest/userguide/create-cloudtrail-S3-source-console.html

AWS LambdaがSQSをイベントソースとしてサポートしました! | Developers.IO
https://dev.classmethod.jp/articles/aws-lambda-support-sqs-event-source/

AWS::Lambda::Function – AWS CloudFormation
https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-function.html#cfn-lambda-function-reservedconcurrentexecutions

Amazon CloudWatch EventsのルールでAmazon S3のキーをプレフィックス指定できた – Qiita
https://cloudpack.media/52397

AWS Developer Forums: CloudWatch event rule not sending to …
https://forums.aws.amazon.com/message.jspa?messageID=742808

Amazon CloudWatch LogsのログをAWS CLIでいい感じに取得する – Qiita
https://cloudpack.media/50416

AWS CLIを使ってAWS Lambdaのログ取得時に注意したいこと | Developers.IO
https://dev.classmethod.jp/articles/note-log-of-lambda-using-awscli/

元記事はこちら

Amazon CloudWatch EventsのイベントルールでターゲットをAmazon SQSにしてAWS Lambdaでイベントソースにして処理するAWS CloudFormationのテンプレートをつくってみた