下記の記事でLambdaのScheduled Eventを使ってSQSを定期的にポーリングして
メッセージを処理する仕組みを作りました。

SQSのメッセージをLambdaで5分おきに処理する(Scheduled Event)

しかし、SQSのメッセージを一度に取得できるのは10件までで
(MaxNumberOfMessagesの最大が10)、そうなると下図のように100件の
メッセージを処理するのに1時間近くかかってしまいます。

2015-10-10_11-19-12

ということで、5分毎のメッセージ処理を並列化してみました。仕組みとしては、
下図のように、5分毎に起動するLambdaファンクション(test-cron)は、
もうメッセージの処理は行わず、キュー上のメッセージの数に応じて
複数のメッセージを処理するLambdaファンクション(tset-sqs2ddb)をSNS経由で
呼び出す形としています。

Untitled(6) (3)

全体の構成は、こんな感じになります。

Untitled(7) (2)

上図までの経緯は、下記記事が参考になります。

それでは次の三つのLambdaファンクションを準備していきます。

  • test-api : API Gatewayのバックエンド。パラメーターをSQSにエンキュー。
  • test-cron : 5分毎に起動。メッセージ数に応じてSNSにPublish。
  • test-sqs2bdb : SNSから起動。SQSのメッセージの内容をDynamoDBにPUT。

Lambdaの設定(test-api)

コードは、こんな感じです。

import json
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

queueName = 'test'

def lambda_handler(event, context):

    try:
        logger.info(event)
        responses = []

        responses.append(
            boto3.resource('sqs').get_queue_by_name(
                QueueName = queueName
            ).send_message(
                MessageBody = json.dumps(event)
            )
        )

       logger.info(responses)
       return responses

     except Exception as e:
                logger.error(e)
                raise e

API Gatewayとの連携は下記を参考に。

“API Gateway”のバックエンドを”Lambda”にしてJSONデータをエコーさせる

Lambdaの設定(test-cron)

まずは、SNSのトピックを作成します。
2015-10-10_05-55-22

2015-10-10_05-57-05

2015-10-10_05-58-06

Lambdaからトピックが利用できるようにするために
IAMロール(lambda_basic_execution)にポリシーをアタッチします。

2015-10-10_06-24-10

2015-10-10_06-26-10

2015-10-10_06-27-08

準備はできました。そしてコードは、こんな感じです。

import math
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

queueName = 'test'
arn = 'arn:aws:sns:ap-northeast-1:000000000000:test'

def lambda_handler(event, context):

    try:
          logger.info(event)
          responses = []

          numberOfMessages = boto3.resource('sqs').get_queue_by_name(
              QueueName = queueName
          ).attributes['ApproximateNumberOfMessages']
          numberOfPublish = int(math.ceil(float(numberOfMessages) / 10))

          for i in range(numberOfPublish):
             responses.append(
                 boto3.resource('sns').Topic(arn).publish(
                     Message = 'test'
                   )
             )

          logger.info(responses)
          return responses

     except Exception as e:
         logger.error(e)
         raise e

Publish(test-sqs2ddbを実行)する回数(numberOfPublish)の決定は次の通りです。

(メッセージ数 / 10)の切り上げ

これはtest-sqs2ddbが一度に取得できるメッセージ数の上限が10だからです。
(SQSのAPIであるReceiveMessageの属性MaxNumberOfMessagesの上限が10)

Scheduled Eventの設定は下記を参考に。

SQSのメッセージをLambdaで5分おきに処理する(Scheduled Event)

Lambdaの設定(test-sqs2bdb)

コードは、こんな感じです。

import json
import uuid
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

queueName = 'test'
maxNumberOfMessages = 10

def lambda_handler(event, context):

    try:
         logger.info(event)
         responses = []

         queue = boto3.resource('sqs').get_queue_by_name(
             QueueName = queueName
         )
        messages = queue.receive_messages(
            MaxNumberOfMessages = maxNumberOfMessages
        )

        entries = []
        items = []
        for message in messages:
            entries.append({
                "Id": message.message_id,
                 "ReceiptHandle": message.receipt_handle
            })
            items.append({
                "uuid": uuid.uuid1().urn,
                "key1": json.loads(message.body)['key1'],
                "key2": json.loads(message.body)['key2'],
                "key3": json.loads(message.body)['key3']

            })

            table = boto3.resource('dynamodb').Table('Test')
            with table.batch_writer() as batch:
                for item in items:
                    batch.put_item(
                        Item = item
                    )

            if len(entries) != 0:
                responses.append(
                    queue.delete_messages(
                        Entries = entries
                    )
                 )

            logger.info(responses)
            return responses

     except Exception as e:
         logger.error(e)
         raise e

SNS Eventの設定も行います。

2015-10-10_05-59-25

2015-10-10_06-00-44

2015-10-10_10-49-57

テスト

あらかじめ次のコマンドSQSにメッセージを作成しておきます。

$ curl -d '{"key1":"value1","key2":"value2","key3":"value3"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod

手動でトピックをPublishしてみます。
2015-10-10_06-08-57
2015-10-10_06-10-26

するとLambda(test-sqs2bdb)が起動されDynamoDBのテーブルにデータが入ります。
2015-10-10_06-11-59

次に同様に100件SQSにメッセージを作成しておきます。
2015-10-10_11-07-57

手動でPublishしなくても5分毎にLambda(test-cron)が起動してPublishしているので、
やはり、DynamoDBのテーブルにデータが入ります。

2015-10-10_11-30-26

下図のように、より短時間で100件のメッセージを処理していることもわかります。
2015-10-10_11-27-52

今まで処理量は下図のように5分で10件でしたが、今回は5分で
全体のほとんど(80件程度)を処理してます。(まあそういう仕組みにしたんですが…)

2015-10-10_11-19-12

元記事はこちら

“Scheduled Event”の”Lambda”の処理を並列化する