下記の記事でLambdaのScheduled Eventを使ってSQSを定期的にポーリングして
メッセージを処理する仕組みを作りました。
SQSのメッセージをLambdaで5分おきに処理する(Scheduled Event)
しかし、SQSのメッセージを一度に取得できるのは10件までで
(MaxNumberOfMessagesの最大が10)、そうなると下図のように100件の
メッセージを処理するのに1時間近くかかってしまいます。
ということで、5分毎のメッセージ処理を並列化してみました。仕組みとしては、
下図のように、5分毎に起動するLambdaファンクション(test-cron)は、
もうメッセージの処理は行わず、キュー上のメッセージの数に応じて
複数のメッセージを処理するLambdaファンクション(tset-sqs2ddb)をSNS経由で
呼び出す形としています。
全体の構成は、こんな感じになります。
上図までの経緯は、下記記事が参考になります。
- “API Gateway”のバックエンドを”Lambda”にしてJSONデータをエコーさせる
- “API Gateway → Lambda → SQS”でJSONデータをエンキューする
- SQSのメッセージをLambdaで5分おきに処理する(Scheduled Event)
- Boto3(Python)で”Service Resource”を使ってみた(Lambda)
- Lambda(Python)でSQSのメッセージの内容をDynamoDBにPUTする
それでは次の三つのLambdaファンクションを準備していきます。
- test-api : API Gatewayのバックエンド。パラメーターをSQSにエンキュー。
- test-cron : 5分毎に起動。メッセージ数に応じてSNSにPublish。
- test-sqs2bdb : SNSから起動。SQSのメッセージの内容をDynamoDBにPUT。
Lambdaの設定(test-api)
コードは、こんな感じです。
import json import boto3 import logging logger = logging.getLogger() logger.setLevel(logging.INFO) queueName = 'test' def lambda_handler(event, context): try: logger.info(event) responses = [] responses.append( boto3.resource('sqs').get_queue_by_name( QueueName = queueName ).send_message( MessageBody = json.dumps(event) ) ) logger.info(responses) return responses except Exception as e: logger.error(e) raise e
API Gatewayとの連携は下記を参考に。
“API Gateway”のバックエンドを”Lambda”にしてJSONデータをエコーさせる
Lambdaの設定(test-cron)
まずは、SNSのトピックを作成します。
Lambdaからトピックが利用できるようにするために
IAMロール(lambda_basic_execution)にポリシーをアタッチします。
準備はできました。そしてコードは、こんな感じです。
import math import boto3 import logging logger = logging.getLogger() logger.setLevel(logging.INFO) queueName = 'test' arn = 'arn:aws:sns:ap-northeast-1:000000000000:test' def lambda_handler(event, context): try: logger.info(event) responses = [] numberOfMessages = boto3.resource('sqs').get_queue_by_name( QueueName = queueName ).attributes['ApproximateNumberOfMessages'] numberOfPublish = int(math.ceil(float(numberOfMessages) / 10)) for i in range(numberOfPublish): responses.append( boto3.resource('sns').Topic(arn).publish( Message = 'test' ) ) logger.info(responses) return responses except Exception as e: logger.error(e) raise e
Publish(test-sqs2ddbを実行)する回数(numberOfPublish)の決定は次の通りです。
(メッセージ数 / 10)の切り上げ
これはtest-sqs2ddbが一度に取得できるメッセージ数の上限が10だからです。
(SQSのAPIであるReceiveMessageの属性MaxNumberOfMessagesの上限が10)
Scheduled Eventの設定は下記を参考に。
SQSのメッセージをLambdaで5分おきに処理する(Scheduled Event)
Lambdaの設定(test-sqs2bdb)
コードは、こんな感じです。
import json import uuid import boto3 import logging logger = logging.getLogger() logger.setLevel(logging.INFO) queueName = 'test' maxNumberOfMessages = 10 def lambda_handler(event, context): try: logger.info(event) responses = [] queue = boto3.resource('sqs').get_queue_by_name( QueueName = queueName ) messages = queue.receive_messages( MaxNumberOfMessages = maxNumberOfMessages ) entries = [] items = [] for message in messages: entries.append({ "Id": message.message_id, "ReceiptHandle": message.receipt_handle }) items.append({ "uuid": uuid.uuid1().urn, "key1": json.loads(message.body)['key1'], "key2": json.loads(message.body)['key2'], "key3": json.loads(message.body)['key3'] }) table = boto3.resource('dynamodb').Table('Test') with table.batch_writer() as batch: for item in items: batch.put_item( Item = item ) if len(entries) != 0: responses.append( queue.delete_messages( Entries = entries ) ) logger.info(responses) return responses except Exception as e: logger.error(e) raise e
SNS Eventの設定も行います。
テスト
あらかじめ次のコマンドSQSにメッセージを作成しておきます。
$ curl -d '{"key1":"value1","key2":"value2","key3":"value3"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod
手動でトピックをPublishしてみます。
するとLambda(test-sqs2bdb)が起動されDynamoDBのテーブルにデータが入ります。
次に同様に100件SQSにメッセージを作成しておきます。
手動でPublishしなくても5分毎にLambda(test-cron)が起動してPublishしているので、
やはり、DynamoDBのテーブルにデータが入ります。
下図のように、より短時間で100件のメッセージを処理していることもわかります。
今まで処理量は下図のように5分で10件でしたが、今回は5分で
全体のほとんど(80件程度)を処理してます。(まあそういう仕組みにしたんですが…)