はじめに
Amazon Athena (以下、Athena) は、Amazon S3上の膨大なデータに対して直接SQLを実行し、素早く分析やデータ加工を行えるサーバーレスなクエリサービスとして、広く利用されています。
しかし、その処理が非同期であることから、API の呼び出しは複雑です。
プログラムで処理を行おうとする場合、コードが複雑化して、バグを内包する可能性が高まります。
最悪の場合、Out of Memory (OOM) による停止や、不適切な処理による AWS SDK の過剰なコールなどのバグが発生することも考えられます。
この記事では、この処理を簡素化するためのテクニックについて紹介します。
複雑な処理をライブラリにカプセル化することで、利用側では『たった1行で Athena を安全に利用可能』となります。
使用言語は TypeScript です。
Athena の実行フロー
AWS SDK を利用して、Athena でクエリを実行する場合のフローは、以下のとおりです。

大きく分類すると、3ブロックに分かれます。
- StartQueryExecution
- SQL を渡して処理を実行する。
- GetQueryExecution
- クエリの実行状態を取得します。
- 処理が終わっていない場合、sleep を挟んで遅延させます。
- GetQueryResults
- クエリ結果を順次取得します。
- NextToken を利用して、ブロックごとに取得します。
このように、必要なコマンドが分かれるとともに、正しく処理する必要があるため、コードが複雑化しやすいです。
また、メモリについても考慮が必要です。
Athena は大規模データを取り扱っています。
オンメモリでデータをキャッシュすると、OOM となりプログラムが終了してしまう事も考えられます。
TypeScript のテクニック
このような処理を、TypeScript で取り扱う場合はいくつかのテクニックが必要です。
代表的なテクニックは以下のとおりです。
AsyncIterator
AsyncIterator は、非同期かつ、Iterable (反復可能) な処理を簡易的に表記できます。
for await (const value of iterator) {
// [value を処理]
}
記述することで、以下の問題を解決できます。
- 遅延処理の適切な待機
- for await は、AsyncIterator を処理するためのキーワードです。
- この記述により、データ取得が非同期であるデータを簡潔に記述できます。
- 逐次処理
- メモリ確保は、Iterable により 1件ごと Value にコピーされます。
- この処理方法により、巨大なデータを、有限のメモリで処理できます。
- 安全な処理
- value は
constとして定義されるため、代入によるバグが発生しません。 - Type 安全です。
- value は
今回の例で言えば、複雑な Athena の結果取得をカプセル化して 1行ずつ (Row データ) として処理することが可能となります。
AsyncGenerator
AsyncGenerator は、シーケンシャルで逐次実行可能な処理を簡易的に記述できます。
今回の場合で考えると、SQL 発行から結果の取得はシーケンシャルで逐次実行処理です。
そのため、今回の処理と相性が良いです。
AsyncGenerator を利用することで、StartQueryExecution を実行し、GetQueryExecution で待機、GetQueryResults で結果取得という一連の処理を一つのコードで記述できます。
AsyncGenerator は、AsyncIterator でもあるため、そのまま AsyncIterator として利用可能です。
async function* () {
yield [data1]
yield [data2]
yield [data3]
}
TypeScript には、巨大なデータを取り扱う方法として Stream があります。
Stream は、取り扱うデータが Byte ストリームであることや、Async 関数と相性が良くないことから、AsyncGenerator が適切です。
| AsyncGenerator | Readable (Stream) | |
|---|---|---|
| データの種類 | オブジェクト | Byte ストリーム |
| 定義方法 | async function* (Generator) |
Readable クラスの拡張 |
| データの処理 | yield で一時停止 |
_read と push |
最適化された Athena での SQL 実行
これらの処理を組み込んだ場合、上述の複雑なフローを劇的に簡略化できます。
利用する側では for await で処理を実行するだけです。
これは、可視性も高く、処理が圧倒的に簡略化されるため、バグが起こりにくくなります。

具体的なコードは、以下のようになります。
import { execute } from "./lib"
const main = async () => {
const sql = `...`
for await (const row of execute(sql)) {
// 処理
}
}
ほぼ全ての処理をライブラリ側に押し込めて、簡略化することができました。
ライブラリ化したコードは、記事下部に載せます。
これにより、大規模なデータを安全に取り扱うことが可能となります。
ライブラリ側には、以下のような機能を押し込めています。
- QueryExecutionId のカプセル化
- Athena のクエリ追跡用の ID をライブラリ内にカプセル化しています。
- ID を意識することなく、結果の取得に注力できます。
- クエリ実行の待機
StartQueryExecution実行後、結果が返ってくるまでsleep(1000)で待機します。FAILEDやCANCELLEDの場合、安全に例外で処理を行っています。- 実行側で意識することなく、遅延と待機処理をしています。
- 安全な結果の取得
- 大規模なデータセットに対して、 1行ずつ
yieldで処理を返しています。 - API からバッチ的に結果が返ってきても、呼び出し側には 1行ずつ処理を返しています。
- 大規模なデータセットに対して、 1行ずつ
まとめ
TypeScript の AsyncGenerator を活用することで、Athena の複雑な非同期実行フローを劇的に簡略化しました。
また、大規模データ処理におけるバグの可能性とメモリ負荷を軽減できます。
このパターンは、AWS SDK を利用する他の非同期処理にも応用可能です。
Amazon S3 への大規模ファイルアップロード、Amazon DynamoDB からのスキャン結果の逐次取得など、AsyncGenerator が活躍する状況は考えられます。
ぜひ、ご自身のプロジェクトで安全かつ簡潔なデータ処理を実現してください。
ライブラリ全文
/// Copyright 2026 Shirousa
/// Licensed under the [MIT License](https://opensource.org/license/mit)
import { AthenaClient, GetQueryExecutionCommand, GetQueryResultsCommand, StartQueryExecutionCommand, type GetQueryResultsCommandOutput, type Row} from "@aws-sdk/client-athena"
const sleep = (ms: number) => new Promise<void>((resolve) => setTimeout(() => resolve(), ms))
export interface Option {
workGroup?: string
}
export async function* execute(sql: string, opt: Option = {}) {
const client = new AthenaClient()
// send execute
const exec = await client.send(new StartQueryExecutionCommand({
WorkGroup: opt.workGroup ?? "primary",
QueryString: sql,
}))
// wait finish
let wait = true
do {
await sleep(1000)
const res = await client.send(new GetQueryExecutionCommand({
QueryExecutionId: exec.QueryExecutionId,
}))
const state = res.QueryExecution?.Status?.State
if (state === "SUCCEEDED") {
wait = false
}
else if (state === "FAILED" || state === "CANCELLED") {
throw new Error(`Query execution ${state}`);
}
} while (wait)
// get datas
let token: string | undefined = undefined
do {
const res = await client.send(new GetQueryResultsCommand({
QueryExecutionId: exec.QueryExecutionId,
NextToken: token,
QueryResultType: "DATA_ROWS"
})) as GetQueryResultsCommandOutput
token = res.NextToken
const result = res.ResultSet
for (const row of result?.Rows ?? []) {
yield row
}
} while (token)
}