下記でSQSにエンキューする部分を作ったので、
"API Gateway → Lambda → SQS"でJSONデータをエンキューする
今度はデキューする部分を作ってみます。
今まではEC2を起動して、その中で定期的にSQSをポーリングするプログラムを
実行していましたが、今ならLambdaのScheduled Eventを使うことで、
EC2無しに定期的にSQSをポーリングすることができます。
絵にすると、下記のような感じです。
(上段は前述のリンクの記事で作成したので今回は下段のLambdaを作成します)
Lambdaの設定
コードは以下のとおりです。
import boto3 import logging logger = logging.getLogger() logger.setLevel(logging.INFO) queueUrl = 'https://sqs.ap-northeast-1.amazonaws.com/000000000000/test' maxNumberOfMessages = 10 def lambda_handler(event, context): try: logger.info(event) client = boto3.client('sqs') receiveResponse = client.receive_message( QueueUrl = queueUrl, MaxNumberOfMessages = maxNumberOfMessages ) logger.info(receiveResponse) response = receiveResponse if receiveResponse.has_key('Messages'): entries = [] for message in receiveResponse['Messages']: entries.append({ "Id": message['MessageId'], "ReceiptHandle": message['ReceiptHandle'] }) deleteResponse = client.delete_message_batch( QueueUrl = queueUrl, Entries = entries ) logger.info(deleteResponse) response = deleteResponse return response except Exception as e: logger.error(e) raise e
“receive_message”で最大10件メッセージを受け取って”delete_message_batch”で
まとめて削除しています。
タイムアウトの時間も1分に変更しています。
Lambdaのテスト
まず、キューにメッセージが存在することを確認して、
Lambdaをテストします。
下記のように二つのメッセージの削除が成功したというレスポンスが返ってきて、
{ "Successful": [ { "Id": "c820ef92-7d8d-4aa4-94a0-0843e683d158" }, { "Id": "52fa1240-bd8b-43bf-ac43-739118e613e1" } ], "ResponseMetadata": { "HTTPStatusCode": 200, "RequestId": "257f3292-d917-517b-8995-7c00dc7bbaa0" } }
実際にキューを確認すると、メッセージ数も0になっています。
Scheduled Eventの設定
上記の処理を5分ごとに実行するためにLambdaのScheduled Eventの設定を行います。
これで5分毎にSQSをポーリングし、メッセージがあれば、
最大10件まで削除するようになったはずです。
CloudWatchでの確認
最後にキューの中のメッセージの推移をCloudWatchで確認します。
60程度のメッセージが5分毎に10程度ずつ減少していることが確認できます。