はじめに

Amazon Athena (以下、Athena) は、Amazon S3上の膨大なデータに対して直接SQLを実行し、素早く分析やデータ加工を行えるサーバーレスなクエリサービスとして、広く利用されています。

しかし、その処理が非同期であることから、API の呼び出しは複雑です。
プログラムで処理を行おうとする場合、コードが複雑化して、バグを内包する可能性が高まります。

最悪の場合、Out of Memory (OOM) による停止や、不適切な処理による AWS SDK の過剰なコールなどのバグが発生することも考えられます。

この記事では、この処理を簡素化するためのテクニックについて紹介します。
複雑な処理をライブラリにカプセル化することで、利用側では『たった1行で Athena を安全に利用可能』となります。

使用言語は TypeScript です。

Athena の実行フロー

AWS SDK を利用して、Athena でクエリを実行する場合のフローは、以下のとおりです。

大きく分類すると、3ブロックに分かれます。

  1. StartQueryExecution
    1. SQL を渡して処理を実行する。
  2. GetQueryExecution
    1. クエリの実行状態を取得します。
    2. 処理が終わっていない場合、sleep を挟んで遅延させます。
  3. GetQueryResults
    1. クエリ結果を順次取得します。
    2. NextToken を利用して、ブロックごとに取得します。

このように、必要なコマンドが分かれるとともに、正しく処理する必要があるため、コードが複雑化しやすいです。

また、メモリについても考慮が必要です。
Athena は大規模データを取り扱っています。
オンメモリでデータをキャッシュすると、OOM となりプログラムが終了してしまう事も考えられます。

TypeScript のテクニック

このような処理を、TypeScript で取り扱う場合はいくつかのテクニックが必要です。

代表的なテクニックは以下のとおりです。

AsyncIterator

AsyncIterator は、非同期かつ、Iterable (反復可能) な処理を簡易的に表記できます。

for await (const value of iterator) {
    // [value を処理]
}

記述することで、以下の問題を解決できます。

  1. 遅延処理の適切な待機
    1. for await は、AsyncIterator を処理するためのキーワードです。
    2. この記述により、データ取得が非同期であるデータを簡潔に記述できます。
  2. 逐次処理
    1. メモリ確保は、Iterable により 1件ごと Value にコピーされます。
    2. この処理方法により、巨大なデータを、有限のメモリで処理できます。
  3. 安全な処理
    1. value は const として定義されるため、代入によるバグが発生しません。
    2. Type 安全です。

今回の例で言えば、複雑な Athena の結果取得をカプセル化して 1行ずつ (Row データ) として処理することが可能となります。

AsyncGenerator

AsyncGenerator は、シーケンシャルで逐次実行可能な処理を簡易的に記述できます。

今回の場合で考えると、SQL 発行から結果の取得はシーケンシャルで逐次実行処理です。
そのため、今回の処理と相性が良いです。

AsyncGenerator を利用することで、StartQueryExecution を実行し、GetQueryExecution で待機、GetQueryResults で結果取得という一連の処理を一つのコードで記述できます。

AsyncGenerator は、AsyncIterator でもあるため、そのまま AsyncIterator として利用可能です。

async function* () {
    yield [data1]
    yield [data2]
    yield [data3]
}

TypeScript には、巨大なデータを取り扱う方法として Stream があります。
Stream は、取り扱うデータが Byte ストリームであることや、Async 関数と相性が良くないことから、AsyncGenerator が適切です。

AsyncGenerator Readable (Stream)
データの種類 オブジェクト Byte ストリーム
定義方法 async function* (Generator) Readable クラスの拡張
データの処理 yield で一時停止 _readpush

最適化された Athena での SQL 実行

これらの処理を組み込んだ場合、上述の複雑なフローを劇的に簡略化できます。

利用する側では for await で処理を実行するだけです。
これは、可視性も高く、処理が圧倒的に簡略化されるため、バグが起こりにくくなります。

具体的なコードは、以下のようになります。

import { execute } from "./lib"

const main = async () => {
    const sql = `...`
    for await (const row of execute(sql)) {
        // 処理
    }
}

ほぼ全ての処理をライブラリ側に押し込めて、簡略化することができました。
ライブラリ化したコードは、記事下部に載せます。

これにより、大規模なデータを安全に取り扱うことが可能となります。

ライブラリ側には、以下のような機能を押し込めています。

  • QueryExecutionId のカプセル化
    • Athena のクエリ追跡用の ID をライブラリ内にカプセル化しています。
    • ID を意識することなく、結果の取得に注力できます。
  • クエリ実行の待機
    • StartQueryExecution 実行後、結果が返ってくるまで sleep(1000) で待機します。
    • FAILEDCANCELLED の場合、安全に例外で処理を行っています。
    • 実行側で意識することなく、遅延と待機処理をしています。
  • 安全な結果の取得
    • 大規模なデータセットに対して、 1行ずつ yield で処理を返しています。
    • API からバッチ的に結果が返ってきても、呼び出し側には 1行ずつ処理を返しています。

まとめ

TypeScript の AsyncGenerator を活用することで、Athena の複雑な非同期実行フローを劇的に簡略化しました。
また、大規模データ処理におけるバグの可能性とメモリ負荷を軽減できます。

このパターンは、AWS SDK を利用する他の非同期処理にも応用可能です。
Amazon S3 への大規模ファイルアップロード、Amazon DynamoDB からのスキャン結果の逐次取得など、AsyncGenerator が活躍する状況は考えられます。

ぜひ、ご自身のプロジェクトで安全かつ簡潔なデータ処理を実現してください。

ライブラリ全文

/// Copyright 2026 Shirousa
/// Licensed under the [MIT License](https://opensource.org/license/mit)

import { AthenaClient, GetQueryExecutionCommand, GetQueryResultsCommand, StartQueryExecutionCommand, type GetQueryResultsCommandOutput, type Row} from "@aws-sdk/client-athena"

const sleep = (ms: number) => new Promise<void>((resolve) => setTimeout(() => resolve(), ms))

export interface Option {
    workGroup?: string
}
export async function* execute(sql: string, opt: Option = {}) {
    const client = new AthenaClient()

    // send execute
    const exec = await client.send(new StartQueryExecutionCommand({
        WorkGroup: opt.workGroup ?? "primary",
        QueryString: sql,
    }))

    // wait finish
    let wait = true
    do {
        await sleep(1000)
        const res = await client.send(new GetQueryExecutionCommand({
            QueryExecutionId: exec.QueryExecutionId,
        }))
        const state = res.QueryExecution?.Status?.State
        if (state === "SUCCEEDED") {
            wait = false
        }
        else if (state === "FAILED" || state === "CANCELLED") {
            throw new Error(`Query execution ${state}`);
        }
    } while (wait)

    // get datas
    let token: string | undefined = undefined
    do {
        const res = await client.send(new GetQueryResultsCommand({
            QueryExecutionId: exec.QueryExecutionId,
            NextToken: token,
            QueryResultType: "DATA_ROWS"
        })) as GetQueryResultsCommandOutput

        token = res.NextToken
        const result = res.ResultSet

        for (const row of result?.Rows ?? []) {
            yield row
        }
    } while (token)
}