はじめに

この記事の内容は、2025年2月時点の以下記事を参考に作成しています。
https://qiita.com/poruruba/items/a22eec371a3e993c47d1
https://note.shiftinc.jp/n/n929fbf3ff4e7
https://ma-vericks.com/blog/cognito-email-mfa/
https://note.shiftinc.jp/n/naba17a726484

概要

Amazon Cognitoのユーザ認証で、パスワードでの認証成功後にワンタイムパスワード認証を追加した、二段階認証を行う。
ワンタイムパスワードは、パスワード検証成功後にユーザにメールで通知する。

カスタム認証チャレンジは、以下トリガーで実行されるAWS Lambdaで認証に関する動作を定義する。

認証チャレンジの定義
パスワード検証の開始、カスタム認証フローの開始・終了を定義する。
認証チャレンジの作成
ユーザに提示する認証方法と、認証方法に対する回答を定義する。
ワンタイムパスワード認証を行う場合、認証方法がワンタイムパスワードであることと、ユーザが返すべき一時認証コード(回答)を定義する。
認証チャレンジレスポンスの検証
ユーザの回答から、カスタム認証チャレンジの成功・失敗を定義する。

実装

今回の動作検証では、以下の流れとなった。

  1. クライアントアプリ(ユーザ)がサインインの開始をリクエスト。
  2. 「認証チャレンジの定義」でパスワード検証の開始。
  3. クライアントアプリがパスワード検証をリクエスト。
  4. パスワード検証成功後、「認証チャレンジの定義」でカスタム認証フローの開始。
  5. 「認証チャレンジの作成」でワンタイムパスワードでの認証プロセスを定義。
  6. クライアントアプリがメールで通知された一時認証コードで検証をリクエスト。
  7. 「認証チャレンジレスポンスの検証」でユーザが送信した一時認証コードの検証結果を定義。
  8. ワンタイムパスワード認証プロセス完了後、認証チャレンジの完了を定義。
  9. クライアントアプリに認証トークンがレスポンスされる。

1. クライアントアプリ(ユーザ)がサインインの開始をリクエスト。
クライアントアプリは Python で実装。
SRP_A の算出は warrant.aws_srp を使用。

cognito = boto3.client('cognito-idp', region_name=pool_region)
challenge_name = 'SRP_A'
cognito.initiate_auth(
 AuthFlow='CUSTOM_AUTH',
 AuthParameters={
  'USERNAME': user_name,
  'CHALLENGE_NAME': challenge_name,
  'SRP_A': srp_a,
 },
 ClientId=client_id,
)

2. 「認証チャレンジの定義」でパスワード検証の開始。
AWS Lambda は Python で実装。
起動される AWS Lambda に渡されるパラメータに session があり、
このリストに認証チャレンジの詳細が時系列に設定されている。
session[0] には、最初のチャレンジ内容が設定されている。
認証チャレンジの定義の Lambda トリガー
サインイン開始時の session は、以下が設定されていた。

"session": [
 {
  "challengeName": "SRP_A",
  "challengeResult": true,
  "challengeMetadata": null
 }
]

このリクエストに対し、パスワード検証を開始するレスポンス設定を行う。

# リクエストの session 配列が1要素の場合
if session_length == 1:
 if session[0]['challengeName'] == 'SRP_A' and session[0]['challengeResult']:
  event['response']['challengeName'] = 'PASSWORD_VERIFIER'
  event['response']['issueTokens'] = False
  event['response']['failAuthentication'] = False

3. クライアントアプリがパスワード検証をリクエスト。
クライアントアプリの最初のリクエスト initiate_auth で、セッション情報やパスワード検証用の値を取得する。

"ChallengeName": "PASSWORD_VERIFIER",
"Session": "XXXXXXXXXXXXXXXXXXXX",
"ChallengeParameters": {
 "SALT": "XXXXXXXXXXXXXXXXXXXX",
 "SECRET_BLOCK": "XXXXXXXXXXXXXXXXXXXX",
 "SRP_B": "XXXXXXXXXXXXXXXXXXXX",
 "USERNAME": "XXX",
 "USER_ID_FOR_SRP": "XXXX"
},

この戻り値を使用し、パスワード検証をリクエスト。

challenge_name = 'PASSWORD_VERIFIER'
cognito.respond_to_auth_challenge(
 ClientId=client_id,
 Session=Session,
 ChallengeName=challenge_name,
 ChallengeResponses=challenge_response
)

4. パスワード検証成功後、「認証チャレンジの定義」でカスタム認証フローの開始。
パスワードの検証が成功すると、session にそのチャレンジ内容が設定されたパラメータが Lambda に渡される。
パスワードの検証が失敗すると Lambda は呼び出されない。

"session": [
 {
  "challengeName": "SRP_A",
  "challengeResult": true,
  "challengeMetadata": null
 },
 {
  "challengeName": "PASSWORD_VERIFIER",
  "challengeResult": true,
  "challengeMetadata": null
 }
]

2つ目のチャレンジがパスワード検証で、リクエストユーザが二段階認証を設定している場合に、カスタム認証フローを開始するレスポンス設定を行う。
次の「認証チャレンジの作成」も同じ session 情報となる為、
トリガーソース(triggerSource)が認証チャレンジの定義(DefineAuthChallenge_Authentication)の条件で区別する。

if session_length == 2:
 if session[1]['challengeName'] == 'PASSWORD_VERIFIER' and session[1]['challengeResult']:
  # 認証チャレンジの定義であること
  if event['triggerSource'] == 'DefineAuthChallenge_Authentication':

   # DB 上に登録されているユーザ情報を取得。二段階認証を行うユーザかを判定
   user_record = get_user_record()

   if user_record['two_step_auth']:
    # 二段階認証を行う場合
    event['response']['challengeName'] = 'CUSTOM_CHALLENGE'
    event['response']['issueTokens'] = False
    event['response']['failAuthentication'] = False
   else:
    # パスワード検証のみの場合。認証チャレンジを完了する
    event['response']['issueTokens'] = True
    event['response']['failAuthentication'] = False

5. 「認証チャレンジの作成」でワンタイムパスワードでの認証プロセスを定義。
リクエストの session は 4. と同じ値。
triggerSource が CreateAuthChallenge_Authentication かどうかで区別する。
レスポンスにチャレンジ名(ワンタイムパスワードであること)とそれに対する回答(一時認証コード)を設定する。
一時認証コードはユーザにメール通知し、
ユーザ側は提示されたチャレンジ名に対する回答として一時認証コードを送信する。

if session_length == 2:
 if session[1]['challengeName'] == 'PASSWORD_VERIFIER' and session[1]['challengeResult']:
  # 認証チャレンジの作成であること
  if event['triggerSource'] == 'CreateAuthChallenge_Authentication':

   # 一時認証コード生成
   one_time_auth_code = get_random()
   # 一時認証コードをメール通知
   send_mail_to_user(from_mail, to_mail, subject, mail_body)

   # チャレンジ名と一時認証コードをレスポンスに含める
   # challenge はユーザに提示される
   # answer の一時認証コードは、次の「認証チャレンジレスポンスの検証」のリクエストに含まれる
   event['response']['publicChallengeParameters'] = {}
   event['response']['publicChallengeParameters']['challenge'] = 'one-time-auth-code'
   event['response']['privateChallengeParameters'] = {}
   event['response']['privateChallengeParameters']['answer'] = one_time_auth_code

6. クライアントアプリがメールで通知された一時認証コードで検証をリクエスト。
パスワード検証後のレスポンスに Lambda トリガー側で設定したチャレンジ内容が含まれている。

"ChallengeName": "CUSTOM_CHALLENGE",
"ChallengeParameters": {
 "challenge": "one-time-code"
}

チャレンジ内容が認証コードの検証(one-time-code)の場合に、メールで通知された一時認証コードを送信する。

challenge_name = 'CUSTOM_CHALLENGE'
one_time_code = 'XXXXXXX'
cognito.respond_to_auth_challenge(
 ClientId=client_id,
 ChallengeName=challenge_name,
 Session=Session,
 ChallengeResponses={'ANSWER': one_time_code, 'USERNAME': user_name}
)

7. 「認証チャレンジレスポンスの検証」でユーザが送信した一時認証コードの検証結果を定義。
リクエストに session は含まれない。
「認証チャレンジの作成」で定義した一時認証コードと、ユーザが送信した一時認証コードが設定されている。

"privateChallengeParameters": {
 "answer": "XXXXXXXX"
},
"challengeAnswer": "XXXXXXXX"

レスポンスに検証結果を設定する。

# session が未設定の場合
if 'session' not in event['request']:
 # 認証チャレンジの検証の場合
 if event['triggerSource'] == 'VerifyAuthChallengeResponse_Authentication':

  # 検証
  if event['request']['privateChallengeParameters']['answer'] == event['request']['challengeAnswer']:
   # 認証成功
   event['response']['answerCorrect'] = True
  else:
   # 認証失敗
   event['response']['answerCorrect'] = False

8. ワンタイムパスワード認証プロセス完了後、認証チャレンジの完了を定義。
リクエストの session に「認証チャレンジレスポンスの検証」で設定した結果が含まれる。

"session": [
 {
  "challengeName": "SRP_A",
  "challengeResult": true,
  "challengeMetadata": null
 },
 {
  "challengeName": "PASSWORD_VERIFIER",
  "challengeResult": true,
  "challengeMetadata": null
 },
 {
  "challengeName": "CUSTOM_CHALLENGE",
  "challengeResult": true,
  "challengeMetadata": null
 }
]

3つ目のチャレンジ結果が成功の場合にトークンを発行する。

if session_length == 3:
 if session[2]['challengeResult'] == True:
  # 成功の場合。トークンの発行
  event['response']['issueTokens'] = True
  event['response']['failAuthentication'] = False
 else:
  # 失敗の場合
  event['response']['issueTokens'] = False
  event['response']['failAuthentication'] = True

最後に

最初は実装するのが難しいと感じていたが、
処理の流れをある程度把握できてからは、任意のタイミングでエラーを発生させる、トークンを発行する等、各動作の実装が容易にできると感じた。
また Lambda 関数内で、システムの DB や恐らく Amazon S3 にもアクセスできるため、もう一歩踏み込んだ対応ができるのではと思った。