この記事は何?

StepFunctionsを利用して、並行状態の入出力処理について得た知見についてこのメモにまとめます。

Step Functionsとは?

step Functions は、AWS Lambda 関数およびその他のビジネスクリティカルなアプリケーションを構築するための AWS のサービスを組み合わせることができるサーバーレスオーケストレーションサービスです。Step Functions のグラフィカルコンソールでは、アプリケーションのワークフローを一連のイベント駆動型ステップとして確認できます。
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/welcome.html

サーバレスでワークフローを実装することができるAWSのサービスです。特にAWSのサービスとは相性がよく、AWSのAPIの呼び出しはStepFunctionsの機能として組み込まれているため、AWSを管理するワークフローであればローコードでの実装が可能です。

類似ツールとしてはApache Airflowがあります。

使用してみた所感としては、GUIで直感的にワークフローを組み立てることができるので、少ない学習コストで利用できると感じました。

入出力処理

StepFunctionsはJSONデータを入力として受け取り、ワークフローのそれぞれの処理(状態)にてそのJSONを入力として利用します。また出力もJSONとして、次の状態に渡します。

例えば

というようなワークフローがある場合、各状態における入出力は以下の通りになります。

開始時および状態1の入力

{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {}        
}

状態1の出力および状態2の入力

{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {
        "state1" : "状態1の出力"    
    }        
}

plaintext:状態2の出力および終了時

{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {
        "state1" : "状態1の出力",
        "state2" : "状態2の出力"    
    }        
}

上記はあくまで一例です。どのように入出力処理を制御するかもStepFunctionsを利用する上で重要になる点だと思います。

AWS Step Functions ワークフローにおける JSON の入出力処理について説明します。

並行

ワークフローの中で並行処理を実装するための部品です。

これを使うと簡単に、並行処理を実装することが可能です。
並行状態においては、枝分かれした状態の流れをブランチと呼びます。

上記の並行状態では、ブランチが2つあるということなります。

AWS Step Functions の並行状態について説明します。

各ブランチは独立したワークフローとなります。
並行状態内の全てのブランチの処理が完了(ブランチの最後の状態が完了)したら、その並行状態が完了したと見なされます。

並行状態における入出力処理

先ほど紹介したシーケンシャルなワークフロー

では、JSONが入出力としてワークフローの上から下に流れていくだけでイメージしやすいと思います。

では以下のような並行なワークフロー

こちらにおいて、入出力はどのようになるでしょうか?

並行状態の入力

並行状態への入力は各ブランチに自身(並行状態)のコピーを渡します。

Parallel 状態は各ブランチに自身の入力データ (InputPath フィールドによって変更される場合があります) のコピーを提供します。そのブランチからの出力を含む各ブランチの 1 つの要素から成る配列である出力が生成されます。すべての要素が同じタイプである必要はありません。出力配列は、通常の方法で ResultPath を使用して入力データに挿入し、全部を Parallel 状態の出力として送信できます (入力および出力処理を参照)。
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/amazon-states-language-parallel-state.html#amazon-states-language-parallel-state-output

つまり状態1と状態2が開始から同じJSONを入力として受け取ることになります。

並行状態の出力

では出力はどうなるでしょうか?

並行状態からの出力は、各ブランチの出力が配列となって次の状態に渡されます。

(このことに関しては公式のドキュメントに記載なさそうです。)

並行状態における入出力のサンプル

開始時

{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {}        
}

状態1の入力

{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {}        
}

状態2の入力

{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {}        
}

状態1と状態2の入力は並行状態に入った際に、コピーして同一のものが渡されます。

状態1の出力

{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {
        "state1" : "状態1の出力"
    }        

}

状態2の出力

{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {
        "state2" : "状態2の出力"
    }        
}

終了時

[
    {
        "input":
            "state1" {
                "hoge":"hoge",
                "fuga":"fuga"
            }
            "state2" {
                "hoge":"hoge",
                "fuga":"fuga"
            }
        "output": {
            "state1" : "状態1の出力"
        }        
    },
    {
        "input":
            "state1" {
                "hoge":"hoge",
                "fuga":"fuga"
            }
            "state2" {
                "hoge":"hoge",
                "fuga":"fuga"
            }
        "output": {
            "state2" : "状態2の出力"
        }        
    }
]

並列状態の出力は上記のように、配列に各ブランチの出力が格納される形になる。もしこの並列状態の後にさらにワークフローが続くようであれば、入出力処理が煩雑になります。

並列状態の前では
json.input.state1.hoge
と指定していたところを
json[0].input.state1.hoge
といった形で配列の要素を指定する必要が出てきます。

もしワークフローに開始時のJSONに入っている値であれば、全てのブランチにコピーされるため、どの配列の要素を選択しても値は取得できます。ただ並列状態内の出力を利用したい場合は何番目のブランチであるかを選択しないと値を取得することができません。

例えば

のようなワークフローがあった場合、状態3に渡す入力が開始時に渡したJSONにあれば、json[0].input.state3.hogejson[1].input.state3.hogeどちらを選択しても状態3に入力を渡すことが可能です。

ただもし、状態2の出力を状態3に渡したいというケースにおいてはjson[1].output.state2を選択する必要があります。json[0].outputには状態1の出力のみが格納されているため)

ちなみに並行状態後の配列の順番ですが、ブランチの番号順とリンクします。

[{ブランチ1},{ブランチ2}.....{ブランチX}]
という形の配列になります。

検証

hello worldを返すシンプルなLambdaを使用して、並行状態を使用したワークフローを作成する。

lambdaの中身(python3)

import json

def lambda_handler(event, context):
    return {
        "statusCode": 200,
        "body": json.dumps({
            "message": "hello world",
        }),
    }

ワークフロー

state1とstate2はhello worldのlambdaを呼び出すものになっている。

ワークフローはSAMで管理しています。
https://github.com/keiusukematsuda/stepfunctions-parallel-test

では入出力がどうなっているかを確認しましょう。

以下のJSONをワークフローに入力として渡します。

{
  "input": {
    "state1": {
      "hoge": "hoge",
      "fuga": "fuga"
    },
    "state2": {
      "hoge": "hoge",
      "fuga": "fuga"
    }
  }
}

そしてワークフローを実行して、並行状態の後の出力は以下のようになります。

[
  {
    "input": {
      "state1": {
        "hoge": "hoge",
        "fuga": "fuga"
      },
      "state2": {
        "hoge": "hoge",
        "fuga": "fuga"
      }
    },
    "output": {
      "state1": {
        "ExecutedVersion": "$LATEST",
        "Payload": {
          "statusCode": 200,
          "body": "{\"message\": \"hello world\"}"
        },
        "SdkHttpMetadata": {
          "AllHttpHeaders": {
            "X-Amz-Executed-Version": [
              "$LATEST"
            ],
            "x-amzn-Remapped-Content-Length": [
              "0"
            ],
            "Connection": [
              "keep-alive"
            ],
            "x-amzn-RequestId": [
              "b156fdbc-d611-4363-ba1f-a1ddfeebf90d"
            ],
            "Content-Length": [
              "61"
            ],
            "Date": [
              "Fri, 29 Jul 2022 06:31:28 GMT"
            ],
            "X-Amzn-Trace-Id": [
              "root=1-62e37ec0-509f195236e45faf2d85a28c;sampled=0"
            ],
            "Content-Type": [
              "application/json"
            ]
          },
          "HttpHeaders": {
            "Connection": "keep-alive",
            "Content-Length": "61",
            "Content-Type": "application/json",
            "Date": "Fri, 29 Jul 2022 06:31:28 GMT",
            "X-Amz-Executed-Version": "$LATEST",
            "x-amzn-Remapped-Content-Length": "0",
            "x-amzn-RequestId": "b156fdbc-d611-4363-ba1f-a1ddfeebf90d",
            "X-Amzn-Trace-Id": "root=1-62e37ec0-509f195236e45faf2d85a28c;sampled=0"
          },
          "HttpStatusCode": 200
        },
        "SdkResponseMetadata": {
          "RequestId": "b156fdbc-d611-4363-ba1f-a1ddfeebf90d"
        },
        "StatusCode": 200
      }
    }
  },
  {
    "input": {
      "state1": {
        "hoge": "hoge",
        "fuga": "fuga"
      },
      "state2": {
        "hoge": "hoge",
        "fuga": "fuga"
      }
    },
    "output": {
      "state2": {
        "ExecutedVersion": "$LATEST",
        "Payload": {
          "statusCode": 200,
          "body": "{\"message\": \"hello world\"}"
        },
        "SdkHttpMetadata": {
          "AllHttpHeaders": {
            "X-Amz-Executed-Version": [
              "$LATEST"
            ],
            "x-amzn-Remapped-Content-Length": [
              "0"
            ],
            "Connection": [
              "keep-alive"
            ],
            "x-amzn-RequestId": [
              "7208079e-9ab3-4f7e-8608-243949f0ecb8"
            ],
            "Content-Length": [
              "61"
            ],
            "Date": [
              "Fri, 29 Jul 2022 06:31:28 GMT"
            ],
            "X-Amzn-Trace-Id": [
              "root=1-62e37ec0-0c7565263145ba8e6fea3d5a;sampled=0"
            ],
            "Content-Type": [
              "application/json"
            ]
          },
          "HttpHeaders": {
            "Connection": "keep-alive",
            "Content-Length": "61",
            "Content-Type": "application/json",
            "Date": "Fri, 29 Jul 2022 06:31:28 GMT",
            "X-Amz-Executed-Version": "$LATEST",
            "x-amzn-Remapped-Content-Length": "0",
            "x-amzn-RequestId": "7208079e-9ab3-4f7e-8608-243949f0ecb8",
            "X-Amzn-Trace-Id": "root=1-62e37ec0-0c7565263145ba8e6fea3d5a;sampled=0"
          },
          "HttpStatusCode": 200
        },
        "SdkResponseMetadata": {
          "RequestId": "7208079e-9ab3-4f7e-8608-243949f0ecb8"
        },
        "StatusCode": 200
      }
    }
  }
]

lambdaの出力を全て取得しているので、長くなってしまっていますが、入出力がブランチ数(2)の数分コピーされて、それが配列に格納されていることがわかります。

まとめ

Step Functionsの並行(Parallel)状態における入出力処理において

  • 並行状態への入力は各ブランチにコピーを渡す
  • 各ブランチの出力が配列に格納される
    • 配列に格納される順番はブランチの番号通り

StepFunctionsはあまりナレッジが落ちてないから、調べるのが大変。。

元記事はこちら

Step Functionsの並行(Parallel)状態における入出力処理
著者:
@K5K


アイレットなら、AWS で稼働するサーバーを対象とした監視・運用・保守における煩わしい作業をすべて一括して対応し、経験豊富なプロフェッショナルが最適なシステム環境を実現いたします。AWS プレミアコンサルティングパートナーであるアイレットに、ぜひお任せください。

AWS 運用・保守サービスページ

その他のサービスについてのお問合せ、お見積り依頼は下記フォームよりお気軽にご相談ください。
https://cloudpack.jp/contact/form/