下記のの記事で、Lambda(Python)を使って定期的にSQSのメッセージを受信して
削除する仕組みを作ってみました。

SQSのメッセージをLambdaで5分おきに処理する(Scheduled Event)

また下記の記事では、この仕組のLambdaのコード(Python)の改善もしました。

Boto3(Python)で”Service Resource”を使ってみた(Lambda)

しかしSQSのメッセージを受信して削除しているだけなので、実質、何もしてません。

今回は下図のように、SQSのメッセージを受信して「DynamoDBにデータをPUT」して
削除するようにしてみました。

Untitled(5) (2)

Lambdaの設定

対象のコードは下記のようになりました。

import json
import uuid
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

queueName = 'test'
maxNumberOfMessages = 10

def lambda_handler(event, context):

    try:
         logger.info(event)

         queue = boto3.resource('sqs').get_queue_by_name(
             QueueName = queueName
         )

         messages = queue.receive_messages(
             MaxNumberOfMessages = maxNumberOfMessages
         )

         entries = []
         items = []
         for message in messages:
              entries.append({
                   "Id": message.message_id,
                   "ReceiptHandle": message.receipt_handle
              })
              items.append({
                   "uuid": uuid.uuid1().urn,
                   "key1": json.loads(message.body)['key1'],
                   "key2": json.loads(message.body)['key2'],
                   "key3": json.loads(message.body)['key3']
              })

         table = boto3.resource('dynamodb').Table('Test')
         with table.batch_writer() as batch:
             for item in items:
                  batch.put_item(Item = item)

         response = {}
         if len(entries) != 0:
             response = queue.delete_messages(
                  Entries = entries
             )

         logger.info(response)
         return response

     except Exception as e:
          logger.error(e)
          raise e

IAMの設定

LambdaからDynamoDBを操作できるようにするために、
IAMロール(lambda_basic_execution)にポリシーをアタッチします。

2015-10-10_00-58-29
2015-10-10_01-00-34
2015-10-10_01-01-51

DynamoDBの準備

こんな感じに準備しています。
2015-10-10_02-12-14

テスト

適当にAPI Gatewayに対してCURLにてデータをPOSTします。
(Post後LambdaがSQSにエンキューします)

 curl -d '{"key1":"value1","key2":"value2","key3":"value3"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod
{"MD5OfMessageBody": "be6ea76d033276891dcd884cf81a8602", "ResponseMetadata": {"HTTPStatusCode": 200, "RequestId": "43fd09b4-e772-55e3-98b2-47263fc9b8b2"}, "MessageId": "f0e908a3-f08d-4ea9-9d22-41d09a011e8b"}
$ curl -d '{"key1":"value4","key2":"value5","key3":"value6"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod
{"MD5OfMessageBody": "0593cbab7f22d4166ce2de3c2352e869", "ResponseMetadata": {"HTTPStatusCode": 200, "RequestId": "5ec87932-6f15-576e-9a91-37a0b7c1205a"}, "MessageId": "6c70aac1-525c-4d72-b1e4-3f891577b9d5"}
$ curl -d '{"key1":"value7","key2":"value8","key3":"value9"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod
{"MD5OfMessageBody": "ee7ad5a858124910f959a2823d4ceab0", "ResponseMetadata": {"HTTPStatusCode": 200, "RequestId": "935d4868-f9d4-5e1b-b2fd-06fa21330d55"}, "MessageId": "7ae31368-c969-4958-998f-d11ecc620651"}

数分待つと(5分ごとにLambdaがSQSをポーリングしているので)、
DynamoDBにPostしたデータがPUTされていることがわかります。

2015-10-10_02-24-56

元記事はこちら

Lambda(Python)でSQSのメッセージの内容をDynamoDBにPUTする