はじめに

みなさん、案件の中で DynamoDB を使っている方も多いと思いますが、DynamoDB のデータ追加を検知して何らかの処理を実行したいと思ったことはありませんか? DynamoDB には DynamoDB Streams という機能があります。これを使えばデータの追加、変更、削除を検知して、変更があったデータを別の AWS サービスに渡すことができます。

今回は DynamoDB Streams、EventBridge Pipes、SQS を組み合わせて、データ追加を検知し Lambda を実行する仕組みの構築方法をご紹介します。

AWS 構成図

とある案件で DynamoDB Streams を利用していました。そのときの AWS 構成図の一部を抜粋し、案件で構築した構成をご紹介します。

まず Lambda A から DynamoDB に対してデータを追加します。DynamoDB に追加されたデータは DynamoDB Streams によって検知されます。次に、DynamoDB Streams で検知されたデータを EventBridge Pipes で受け取り、データの追加、変更、削除のうち 追加(INSERT)イベントのみをフィルタリングします。EventBridge Pipes によってフィルタリングされたデータは SQS に送信されます。最後に SQS から Lambda B にデータが送信され、最大同時実行数が 10 に制限された状態で Lambda B が実行されます。

それでは各サービスの役割と設定手順を説明します。

DynamoDB Streams の役割、設定手順

DynamoDB Streams では DynamoDB のデータの追加、変更、削除を検知できます。また、変更前および変更後のデータキャプチャを後続サービスに送信できます。今回はデータの追加時に追加されたデータを後続の Lambda で処理したいので、「新しいイメージ」(変更された後のデータ)を取得するように設定します。

1.DynamoDB のテーブルを開き、「エクスポートおよびストリーム」タブをクリックします。

2.「DynamoDB ストリームの詳細」の「オンにする」をクリックします。

3.「新しいイメージ」を選択し、「ストリームをオンにする」をクリックします。これで DynamoDB Streams が有効になり、変更された後のデータが後続のサービスに送信されるようになります。

EventBridge Pipes の役割、設定手順

EventBridge Pipes では DynamoDB Streams からデータキャプチャを受け取りフィルタリングを行うことができます。DynamoDB Streams には INSERT、MODIFY、REMOVE の 3 種類のイベントがありますが、今回はデータ追加のみを検知するために INSERT イベントのみを抽出するように設定します。

1.Amazon EventBridge を開き、「使用を開始する」の「EventBridge パイプ」を選択し、「パイプを作成」をクリックします。

2.パイプ名を入力します。

3.「ソースを選択」から「DynamoDB」を選択します。

4.「DynamoDB ストリーム」からテーブル名を選択します。

5.次に「フィルタリング」をクリックしてフィルタリングの設定をします。

6.下のほうまでスクロールし、「イベントパターン 1」に以下のコードを貼り付けます。このコードによりデータの追加イベントのみがターゲットに送信されるようになります。

{
  "eventName": [{
    "equals-ignore-case": "INSERT"
  }]
}


7.次に「ターゲット」をクリックしてターゲットの設定をします。

8.ターゲットサービスの「選択」から「SQS キュー」を選択します。

9.「キュー」から SQS キューを選択します。

10.最後に「パイプを作成」をクリックして EventBridge Pipes を作成します。

SQS の役割、設定手順

SQS では EventBridge Pipes からデータを受け取り、Lambda に渡す前のバッファとして利用することができます。今回は SQS から Lambda をトリガーするときに最大同時実行数を制限する要件があったので、Lambda の設定により制限をかけます。

1.Lambda 関数を開き、「トリガーを追加」をクリックします。

2. 「ソースを選択」から「SQS」を選択します。

3. SQS キューを選択します。

4. 「Maximum concurrency」(最大同時実行数)に「10」と入力します。これにより SQS から Lambda をトリガーしたときの Lambda の最大同時実行数が 10 に制限されます。

5. 最後に「追加」をクリックして Lambda のトリガーとして SQS を追加します。

これで一連の環境構築が完了します。

まとめ

今回は DynamoDB Streams、EventBridge Pipes、SQS を組み合わせることで、DynamoDB のデータ追加を検知して Lambda を実行する仕組みの構築方法をご紹介しました。Lambda から DynamoDB にわざわざポーリングをしなくてもデータの追加を検知できるのはすごく便利ですね!もし業務で似たような状況に直面した際はぜひ DynamoDB Streams を活用してみてください!