はじめに

AWS Step Functionsでは、2024年にクエリ言語としてJSONataがサポートされました。従来のJSONPath($.variable形式)に代わり、より柔軟で強力な式が書けるようになっています。
本記事では、SQSキューのメッセージを全件処理するステートマシン定義を題材に、JSONata構文の検証で得られた知見をまとめます。

全体のフロー

このステートマシンは、SQSキューからメッセージを取得し、1件ずつJSON化してS3に配置した後、SQSからメッセージを削除する処理をキューが空になるまでループします。

[SQSからメッセージを取得 (SDK統合)]
        ↓
[メッセージの有無を確認 (Choice)] 
   ├─ (メッセージあり)         
   │     ↓
   │  [メッセージ処理 (Map)]
   │     └─ [JSONファイル化して配置 (Lambda)]
   │           ↓
   │        [メッセージ削除 (SQS SDK統合)]─────↑(処理完了後SQSからメッセージを取得に戻る)
   │
   └─ (メッセージなし)
         ↓
      [処理終了]

JSONata対応のポイント

1. QueryLanguageの宣言

ステートマシン定義のトップレベルでQueryLanguageJSONataに設定します。

{
  "QueryLanguage": "JSONata",
  "Comment": "SQSキューのメッセージを全て処理し、JSONファイルをS3に配置する",
  "StartAt": "SQSからメッセージを取得"
}

検証結果:QueryLanguageを指定しない場合、デフォルトはJSONPathになります。JSONata構文を使うには必ずトップレベルで宣言が必要です。

2. AWS SDK統合とArguments

SQSの操作はAWS SDK統合を使い、`Arguments`でパラメータを渡します。
"SQSからメッセージを取得": {
  "Type": "Task",
  "Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
  "Arguments": {
    "QueueUrl": "${queue_url}",
    "MaxNumberOfMessages": 5,
    "VisibilityTimeout": 300
  },
  "Next": "メッセージの有無を確認"
}

検証結果:

  • 従来のJSONPathモードでは`Parameters`を使っていましたが、JSONataモードでは`Arguments`に変わります
  •  AWS SDK統合のResourceは`arn:aws:states:::aws-sdk:{サービス}:{API}`の形式で、JSONataモードでも同じです
  •  `${queue_url}`はTerraformの`templatefile`によって実際のSQS URLに置換されます(JSONata式ではなくTerraform変数)

3. Choice(条件分岐)でのCondition

従来のVariable+比較演算子ではなく、ConditionにJSONata式を直接記述します。

"メッセージの有無を確認": {
  "Type": "Choice",
  "Choices": [
    {
      "Condition": "{% $states.input.Messages != null %}",
      "Next": "メッセージ処理"
    }
  ],
  "Default": "処理終了"
}

従来のJSONPathでの書き方との比較:

// JSONPath(従来)
{
  "Variable": "$.Messages",
  "IsPresent": true,
  "Next": "メッセージ処理"
}

// JSONata(今回)
{
  "Condition": "{% $states.input.Messages != null %}",
  "Next": "メッセージ処理"
}

検証結果:

  •  `Condition`にJSONata式を書くことで、条件を一つの式で表現できます
  •  `$states.input`は現在のステートへの入力データ全体を参照します
  •  SQSの`receiveMessage`はメッセージがない場合`Messages`フィールド自体を返さないため、`!= null`でチェックしています

4. Map stateでのItemsと配列展開

Map stateで処理する配列をItemsにJSONata式で指定します。

"メッセージ処理": {
  "Type": "Map",
  "Items": "{% $states.input.Messages[] %}",
  "ItemProcessor": { ... },
  "Next": "SQSからメッセージを取得"
}

検証結果:

  •  従来の`ItemsPath`(JSONPath)の代わりに`Items`にJSONata式を指定します
  •  `Messages[]`の`[]`はJSONataの配列展開演算子で、配列の各要素を個別に返します
  •  Map stateの`Next`を「SQSからメッセージを取得」に設定することで、**キューが空になるまでループする**仕組みを実現しています
  • `MaxConcurrency`を指定していないため、デフォルトの`0`(無制限)が適用され、**取得した複数メッセージが並列で処理されます**。`receiveMessage`で最大5件を一度に取得するため、最大5件が同時に処理される形になります。Lambdaの同時実行やSQSのスループットに応じて`MaxConcurrency`を設定すれば、並列数を制御することも可能です

5. OutputによるLambda結果の加工

Lambda実行後、Outputで出力データを整形できます。

"JSONファイル化して配置": {
  "Type": "Task",
  "Resource": "${lambda_put_json_arn}",
  "Arguments": {
    "Body": "{% $states.input.Body %}"
  },
  "Output": {
    "ReceiptHandle": "{% $states.input.ReceiptHandle %}"
  },
  "Next": "メッセージ削除 (SQS)"
}

検証結果:

  • `Output`は従来の`ResultSelector`に相当します – Lambda関数には`Body`(SQSメッセージの本文)だけを渡し、Lambda実行後の出力には`ReceiptHandle`を含めています
  • これにより、次の「メッセージ削除」ステートで`ReceiptHandle`を利用できます
  • **入力の一部を出力に引き継ぐ**パターンとして、`Output`内で`$states.input`を参照するのがポイントです

6. SQSメッセージの削除(SDK統合)

処理済みメッセージをSQSから削除します。

"メッセージ削除 (SQS)": {
  "Type": "Task",
  "Resource": "arn:aws:states:::aws-sdk:sqs:deleteMessage",
  "Arguments": {
    "QueueUrl": "${queue_url}",
    "ReceiptHandle": "{% $states.input.ReceiptHandle %}"
  },
  "End": true
}

検証結果:

  • 前のステートの`Output`で`ReceiptHandle`を出力に含めたため、`$states.input.ReceiptHandle`でアクセスできます
  • SDK統合でSQSのdeleteMessageを直接呼び出すことで、Lambda関数を別途用意する必要がありません —

Terraformとの連携

このステートマシン定義はTerraformのtemplatefile関数でデプロイされます。

resource "aws_sfn_state_machine" "example" {
  name     = "example_stepfunctions"
  role_arn = aws_iam_role.example.arn # Step Functionsに紐づけるIAMロール

definition = templatefile("${path.module}/step_functions/example_jsonata.json", {
    queue_url           = aws_sqs_queue.example.url,
    lambda_put_json_arn = aws_lambda_function.put_json.arn # Json化して、S3に配置するlambda関数のARN
  })
}

注意点:JSON内の${queue_url}${lambda_put_json_arn}はTerraformの変数置換であり、JSONataの{% %}式とは別の仕組みです。混同しないように注意してください。

JSONPath vs JSONata 比較まとめ

項目 JSONPath(従来) JSONata
入力参照 `$.variable` `{% $states.input.variable %}`
Lambda引数 `Parameters` `Arguments`
結果の加工 `ResultSelector` `Output`
結果の格納 `ResultPath` `Assign`
入力フィルタ `InputPath` 不要(JSONata式で直接制御)
出力フィルタ `OutputPath` 不要(JSONata式で直接制御)
条件分岐 `Variable` + 比較演算子 `Condition` + JSONata式
Mapの対象 `ItemsPath` `Items`

検証: CSVファイルをS3に配置してSQS経由で処理する

ここでは、実際にCSVデータをS3に配置し、SQSにメッセージを送信してStep Functionsで処理する手順を紹介します。

テストデータ(CSV)

以下のようなCSVファイル test1.csv を用意します。

JSON化したあとの結果

[ 
  { 
    "製品名": "テスト1", 
    "縦": 10, 
    "横": 5, 
    "幅": 3
  }, 
  { 
    "製品名": "テスト2",
    "縦": 20,
    "横": 10,
    "幅": 3
  }, 
  { "製品名": "テスト3",
    "縦": 30,
    "横": 15,
    "幅": 9 
  }
]

手順1: S3にCSVファイルをアップロード

AWS CLIを使ってS3バケットにCSVファイルをアップロードします。

aws s3 cp test1.csv s3://{バケット名}/csv/test1.csv

本環境では、Terraformで以下のように定義されたS3バケットを使用しています。

resource "aws_s3_bucket" "csv" {
  bucket = "csv_bucket"
}

手順2: SQSにメッセージを送信

S3に配置したCSVファイルの情報をSQSメッセージとして送信します。本環境ではFIFOキューを使用しているため、--message-group-idの指定が必要です。

aws sqs send-message \
--queue-url https://sqs.{リージョン}.amazonaws.com/{アカウントID}/{キュー名}.fifo \
--message-body '{
"bucket": "{バケット名}",
"key": "csv/test1.csv",
"file_name": "test1.csv"
}' \
--message-group-id "csv-import"

SQSキューはTerraformで以下のように定義されています。

resource "aws_sqs_queue" "example" {
  name                        = "example-sqs-queue.fifo"
  fifo_queue                  = true
  content_based_deduplication = true
}

FIFOキューのポイント:

  • `content_based_deduplication = true` により、メッセージ本文のハッシュで重複排除が自動的に行われます
  • 同じ内容のメッセージを短時間に2回送信しても、1回だけ処理されます
  • `–message-group-id` でメッセージグループを指定し、グループ内の順序が保証されます

手順3: Step Functionsの実行

SQSにメッセージが入った状態でStep Functionsを実行します。

aws stepfunctions start-execution \
--state-machine-arn arn:aws:states:{リージョン}:{アカウントID}:stateMachine:{ステートマシン名} \
--input '{}'

実行すると、example_jsonata.jsonで定義したフローに沿って以下の処理が行われます。

1. SQSからメッセージを取得 — `receiveMessage`でCSVファイル情報を含むメッセージを取得
2. メッセージの有無を確認 — JSONata式`{% $states.input.Messages != null %}`で判定
3. メッセージ処理(Map) — 取得したメッセージごとにLambdaでJSONファイル化
4. メッセージ削除 — 処理済みメッセージをSQSから削除
5. ループ — SQSにメッセージが残っていれば再度取得(キューが空になるまで繰り返し)

処理の流れ(図解)

[S3] test1.csv をアップロード
↓
[SQS] {"bucket": "...", "key": "csv/test1.csv"} を送信
↓
[Step Functions] 実行開始
↓
[SQS receiveMessage] メッセージを取得
↓
[Lambda] Body(CSVファイル情報)を受け取りJSON化してS3に配置
↓
[SQS deleteMessage] 処理済みメッセージを削除
↓
[ループ] キューが空になるまで繰り返し
↓
[処理終了]

設計パターンまとめ

本ステートマシンで活用されているJSONataの設計パターンをまとめます。

パターン1: SQSポーリングループ

Map stateのNextをSQS受信ステートに戻すことで、キューが空になるまで自動ループします。Lambda関数でのポーリング実装が不要になり、Step Functions側で制御できるのが利点です。

パターン2: Outputによる入力引き継ぎ

Lambda実行後、Lambdaの結果ではなく入力データの一部(ReceiptHandle)を次のステートに渡すパターンです。Output内で$states.inputを参照することで実現しています。

おまけ: Assignによる変数の格納

JSONataモードではAssignを使うことで、ステートの処理結果や任意の値を変数に格納し、後続のステートから参照できます。

基本的な使い方

例えば、Passステートで変数を初期化する場合

"変数を初期化": {
  "Type": "Pass",
  "Assign": {
    "import_code": "example",
    "import_id": 1
  },
  "Next": "次のステート"
}

 

Assignで定義した変数は、以降のどのステートからでも{% $import_code %}のように参照可能です。

Taskステートでの活用

Taskステート(Lambda実行やSDK統合)でもAssignが使えます。Lambda実行結果の一部を変数に格納しつつ、次のステートに処理を渡すことができます。

"データを取得": {
  "Type": "Task",
  "Resource": "${lambda_arn}",
  "Arguments": {
    "id": "{% $import_id %}"
  },
  "Assign": {
    "result_count": "{% $states.result.count %}",
    "fetched_items": "{% $states.result.items %}"
  },
  "Next": "次のステート"
}

 

  •  `$states.result` でLambdaの実行結果を参照できます
  •  必要な値だけを変数に取り出せるため、後続ステートで扱うデータがシンプルになります

JSONPathとの比較

方式 変数格納の方法 参照の方法
JSONPath(従来) `ResultPath: “$.myVar”` で入力JSONに結果をマージ `$.myVar`
JSONata `Assign: { “myVar”: “{% … %}” }` で変数に格納 `{% $myVar %}`

JSONPathでは入力JSONのツリー構造に結果を埋め込む形だったため、ネストが深くなりがちでした。JSONataのAssignは独立した変数として管理できるため、直感的でわかりやすくなっています。

まとめ

JSONataの検証を通じて、以下の知見が得られました。

1. JSONata式は`{% %}`で囲む — `$states.input`で入力を参照し、直感的にデータ操作ができる
2. Outputで出力を自由に整形 — Lambda結果に依存せず、入力データの引き継ぎが簡単
3. Choiceの条件式がシンプル — `Condition`一つで複雑な条件も表現できる
4. SDK統合との相性が良い — SQS操作をLambdaなしで直接実行でき、Argumentsで柔軟にパラメータ指定
5. Mapステートの並列処理できる — バッチ処理で時間短縮できる

JSONataの採用により、ステートマシン定義が従来のJSONPathに比べてシンプルかつ読みやすくなります。新規のStep Functions構築にはJSONataの採用をおすすめします。