並行処理は難しい

AWS SESなどを使用して大量のメールを送信する場合、1通ずつ送信していると時間がかかってしまいます。そういった時に、例えば以下のような並行処理で実装することが考えられます。

import boto3
from concurrent.futures import ThreadPoolExecutor

def send_email(email_data):
    ses_client = boto3.client('ses')
    try:
        response = ses_client.send_email(
            Source=email_data['from'],
            Destination={
                'ToAddresses': [email_data['to']]
            },
            Message={
                'Subject': {
                    'Data': email_data['subject']
                },
                'Body': {
                    'Text': {
                        'Data': email_data['body']
                    }
                }
            }
        )
        return response
    except Exception as e:
        print(f"Error sending email: {str(e)}")
        return None

def lambda_handler(event, context):
    emails = event['emails']  # メール送信データのリスト
    max_workers = 10  # 同時実行数

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(send_email, emails))

    return results

ただ並行処理の実装は結構難しく、

  • エラーハンドリングが複雑になりがち
  • スレッドセーフな実装を心がける必要がある
  • パフォーマンスがハードウェアのスペックに影響されやすい

などの懸念点があります。

StepFunctionsのMapステートを使用して並列稼働しよう

Mapステートを使用してメールを1通送信するLambda関数を同時実行すれば、上記の並行処理と同じような処理が実行できます。

StepFunction

このケースはSQSからメール送信内容を複数受け取ってそれを同時に送信します。
ポイントはSESを使って送信するので秒間リクエストに引っかからないように同時実行数に制限をかけていること、Lambdaの処理が終わった後1秒待機しているところです。
これで秒間リクエスト制限に引っかからないようにしています(1日の送信制限については割愛いたします。StepFunctionで変数定義してカウントアップ、上限超えたら停止、のような処理を入れると思います)。

import boto3

def lambda_handler(event, context):
    # SESクライアントの初期化
    ses_client = boto3.client('ses')

    try:
        # イベントからメール送信に必要なデータを取得
        email_data = event['email']

        # メール送信
        response = ses_client.send_email(
            Source=email_data['from'],
            Destination={
                'ToAddresses': [email_data['to']]
            },
            Message={
                'Subject': {
                    'Data': email_data['subject']
                },
                'Body': {
                    'Text': {
                        'Data': email_data['body']
                    }
                }
            }
        )

        return {
            'statusCode': 200,
            'body': 'メール送信成功',
            'messageId': response['MessageId']
        }

    except Exception as e:
        return {
            'statusCode': 500,
            'body': f'メール送信エラー: {str(e)}'
        }

ここで一つ問題があります。
Mapステートの処理モードをデフォルトのインラインモードにしていますが、一つのLambdaが失敗した場合、Mapステート全体が失敗したとみなされます。
エラーを握り潰してとにかくステータス200で返す、というようなことも可能ではありますが、このコードをエラー処理を正しく行いたい場合はStepFunctionsのエラー処理で行います。

StepFunctionsはエラー処理も直感的

Lambdaのエラー処理をStepFunctionsで行うにはエラー処理タブを選択します。
再試行者(Retrier)で再試行の設定を、キャッチャー(Catcher)でフォールバックで遷移するステートの指定ができます(コンソールの日本語が変ですね)。

StepFunction Retrier

再試行者(Retrier)ではエラー処理を行うエラーの種類の選択、再試行を行うインターバル、再試行回数などが指定できます(エクスポネンシャルバックオフの指定も出来ます)。

StepFunction Catcher

キャッチャー(Catcher)ではフォールバック時に遷移するステートを選ぶことができます。

この辺りをLambdaで実装しようとするとそれなりに面倒なのですが、GUIで簡単に指定できるのはいいですね!

まとめ

いかがでしたでしょうか?今回はメール送信の並行処理というケースのお話でしたが、StepFunctionsが上手くハマってとてもシンプルな実装になりました!気持ちいいですね!

ぜひ皆さんも色々試してみて下さい!