下記のの記事で、Lambda(Python)を使って定期的にSQSのメッセージを受信して
削除する仕組みを作ってみました。
SQSのメッセージをLambdaで5分おきに処理する(Scheduled Event)
また下記の記事では、この仕組のLambdaのコード(Python)の改善もしました。
Boto3(Python)で”Service Resource”を使ってみた(Lambda)
しかしSQSのメッセージを受信して削除しているだけなので、実質、何もしてません。
今回は下図のように、SQSのメッセージを受信して「DynamoDBにデータをPUT」して
削除するようにしてみました。
Lambdaの設定
対象のコードは下記のようになりました。
import json import uuid import boto3 import logging logger = logging.getLogger() logger.setLevel(logging.INFO) queueName = 'test' maxNumberOfMessages = 10 def lambda_handler(event, context): try: logger.info(event) queue = boto3.resource('sqs').get_queue_by_name( QueueName = queueName ) messages = queue.receive_messages( MaxNumberOfMessages = maxNumberOfMessages ) entries = [] items = [] for message in messages: entries.append({ "Id": message.message_id, "ReceiptHandle": message.receipt_handle }) items.append({ "uuid": uuid.uuid1().urn, "key1": json.loads(message.body)['key1'], "key2": json.loads(message.body)['key2'], "key3": json.loads(message.body)['key3'] }) table = boto3.resource('dynamodb').Table('Test') with table.batch_writer() as batch: for item in items: batch.put_item(Item = item) response = {} if len(entries) != 0: response = queue.delete_messages( Entries = entries ) logger.info(response) return response except Exception as e: logger.error(e) raise e
IAMの設定
LambdaからDynamoDBを操作できるようにするために、
IAMロール(lambda_basic_execution)にポリシーをアタッチします。
DynamoDBの準備
こんな感じに準備しています。
テスト
適当にAPI Gatewayに対してCURLにてデータをPOSTします。
(Post後LambdaがSQSにエンキューします)
curl -d '{"key1":"value1","key2":"value2","key3":"value3"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod {"MD5OfMessageBody": "be6ea76d033276891dcd884cf81a8602", "ResponseMetadata": {"HTTPStatusCode": 200, "RequestId": "43fd09b4-e772-55e3-98b2-47263fc9b8b2"}, "MessageId": "f0e908a3-f08d-4ea9-9d22-41d09a011e8b"} $ curl -d '{"key1":"value4","key2":"value5","key3":"value6"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod {"MD5OfMessageBody": "0593cbab7f22d4166ce2de3c2352e869", "ResponseMetadata": {"HTTPStatusCode": 200, "RequestId": "5ec87932-6f15-576e-9a91-37a0b7c1205a"}, "MessageId": "6c70aac1-525c-4d72-b1e4-3f891577b9d5"} $ curl -d '{"key1":"value7","key2":"value8","key3":"value9"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod {"MD5OfMessageBody": "ee7ad5a858124910f959a2823d4ceab0", "ResponseMetadata": {"HTTPStatusCode": 200, "RequestId": "935d4868-f9d4-5e1b-b2fd-06fa21330d55"}, "MessageId": "7ae31368-c969-4958-998f-d11ecc620651"}
数分待つと(5分ごとにLambdaがSQSをポーリングしているので)、
DynamoDBにPostしたデータがPUTされていることがわかります。