はじめに

データ分析用データとしてRedshiftにデータを集約するだけでなく、分析・加工済データを再度アプリケーション側のAurora PostgreSQLに移し(複製し)、アプリケーションで活用するリバースETL、データ同期のニーズが増えています。

しかし、Redshiftに蓄積された数千万件、数億件データを一般的なINSERT文や直列のコピー処理でAuroraへ移そうとすると、「制限時間内に終わらない」といったパフォーマンスの壁に直面します。

今回は、RedshiftのUNLOADコマンドとAurora PostgreSQLのaws_s3拡張機能を組み合わせ、さらにPythonのThreadPoolExecutorによるマルチスレッド制御で高速化した実装例をご紹介します。

アーキテクチャの概要

今回は、以下の処理をAWS Batch上のPythonジョブとして実装しました。

  1. RedshiftのUNLOAD処理:S3へCSVデータをエクスポート
  2. 高速書き込み用の一時テーブルを作成:PKやインデックス、データのない空テーブルを準備
  3. Auroraテーブルに並列インポート:aws_s3拡張機能とPythonのThreadPoolExecutor を使い、CSVを並列流し込み
  4. 後処理:一時テーブルにPKを付与し、本番テーブルへスワップ(入れ替え)

【事前準備】Auroraでaws_s3機能を有効化する

今回、Aurora PostgreSQLからS3上のCSVデータを直接インポートするために、aws_s3拡張機能を使用します。実行にあたって、以下の2つの準備をあらかじめ完了させておく必要があります。

(1)IAMロールの作成とAuroraクラスターへの関連付け

AuroraがS3バケットからデータを読み取るための権限(IAMロール)が必要です。以下の手順で設定します。

  • IAMロールの作成: rds.amazonaws.com を信頼されたエンティティとし、対象のS3バケットに対する s3:GetObject および s3:ListBucket 権限を持つロールを作成します。
  • クラスターへの追加: RDSコンソールの「接続とセキュリティ」タブにある「IAMロールを管理」から、作成したロールをクラスターに関連付けます。この際、機能(Feature)として s3Import を選択してください。

(2)DB内での拡張機能インストール

DBの管理者ユーザーでログインし、以下のSQLを実行して拡張機能を有効化します。

CREATE EXTENSION IF NOT EXISTS aws_s3 CASCADE;

これにより、aws_s3.table_import_from_s3 などの関数が使用可能になります。
詳細な設定手順については、以下の公式ドキュメントも併せて参照してください。

1. RedshiftのUNLOAD処理:S3へCSVデータをエクスポート

まずはRedshiftからS3へCSVデータを払い出します。

UNLOAD ('SELECT col1, col2, col3, col4 FROM redshift_schema.table_name')
TO 's3://<s3バケット名>/<s3キー>'
IAM_ROLE 'arn:aws:iam::<アカウントID>:role/<Redshift用ロール名>'
FORMAT AS CSV
PARALLEL ON
MAXFILESIZE 512 MB
EXTENSION 'csv';
  • PARALLEL ON:Redshiftのスライスごとに並列でS3へ書き出すため、エクスポート時間を大幅に短縮化できます。
  • MAXFILESIZE:1ファイルあたりのサイズを制限します。
  • EXTENSION ‘csv’:csv形式で出力します。
    • ※ csv.gz(圧縮)とすると転送効率は上がりますが、Auroraインポート時にS3バケットに追加設定が必要になるため、今回は運用上の都合でcsvとしました。
  • 0レコードファイルへの対策:
    PARALLEL ON を有効化した場合、データ量やスライス数の影響により、0レコードファイルが生成されることがあります。これらのファイルは後続Auroraインポートでエラー扱いとなるため、本実装ではヘッダーなしで出力し、0Bファイルを除外してインポートする対策を行っています。

2. 高速書き込み用の一時テーブルを作成

Aurora側で、書き込み速度を最大化するために一時テーブルを作成します。

-- 本番テーブルから構造のみコピー(データやPK、インデックスなし)
CREATE TABLE staging_table AS SELECT * FROM production_table WITH NO DATA;

-- 大量投入時の負荷を抑えるため autovacuum を一時停止
ALTER TABLE staging_table SET (autovacuum_enabled = false);
  • PKやインデックスは引き継がない:インデックスがある状態で大量インサートすると、B-treeの再構築が発生し、パフォーマンスに影響するためです。
  • autovacuum無効:インポート中に自動メンテナンスが走ると、CPUやI/Oリソースを奪い合い、書き込みを阻害するため一時的に停止させます。

3. Auroraテーブルに並列インポート

Auroraの一時テーブルに対して、S3からCSVを取り込みます。インポートを開始する前にS3上のファイルサイズを確認し、データが存在するもの(0Bではないもの)だけを対象とします。

1ファイル単位のインポート処理

def import_csv_to_aurora_staging_table(
    aurora_secret_values,
    staging_table_name,      # "schema.table" 形式を想定
    column_list,             # カラム名のリスト
    bucket_name,
    s3_key,
    region_name,
    idx,
    total,
):
    conn = None
    try:
        # スレッドごとに独立したコネクションを確立
        conn = connect_aurora(aurora_secret_values)
        conn.autocommit = True
        cur = conn.cursor()

        # 書き込みパフォーマンス向上のため、このセッションのみ同期コミットをオフにする
        cur.execute("SET synchronous_commit = off;")

        cols_str = ",".join(column_list)

        # aws_s3.table_import_from_s3 を呼び出し
        import_query = f"""
            SELECT aws_s3.table_import_from_s3(
                '{staging_table_name}',
                '{cols_str}',
                '(FORMAT csv, HEADER FALSE, DELIMITER '','')',
                '{bucket_name}',
                '{s3_key}',
                '{region_name}'
            );
        """

        execute_query(cur, import_query)
        logging.info(f"[{idx}/{total}] IMPORT OK (key={s3_key})")
        return True

    except Exception as e:
        logging.error(f"[{idx}/{total}] IMPORT NG (key={s3_key}) error={e}")
        raise # 例外を投げて main 側の as_completed で検知させる
    finally:
        if conn:
            conn.close()
  • aws_s3.table_import_from_s3 を使用:DBエンジン自体がS3から直接データを取得するため、Python環境のメモリを消費せず、オーバーヘッドが極めて小さいのが特徴です。
  • synchronous_commit = off:ディスクへの物理書き込み完了を待たずに次へ進むため、書き込みスループットが向上します。

インポートの並列実行

上記の「1ファイル単位のインポート処理」をworkers分だけ並列化します。
workers数はAuroraスペックなどに合わせて調整・設定します。

def main():
    # ---- (省略) ----
    # S3から取得したキーのうち、0B(空)のファイルを除外する
    # ※ s3_client.head_object() 等で ContentLength > 0 のものに絞り込む
    valid_s3_keys = filter_nonzero_s3_keys(s3_bucket_name, all_s3_keys)

    total = len(valid_s3_keys)
    logging.info(f"Start parallel import: workers={workers}, files={total}")

    futures = []
    # ThreadPoolExecutor を使って並列実行
    with ThreadPoolExecutor(max_workers=workers) as ex:
        for i, s3_key in enumerate(valid_s3_keys, start=1):
            futures.append(
                ex.submit(
                    import_csv_to_aurora_staging_table,
                    aurora_secret_value,
                    f"{dest_schema_name}.{staging_table_name}",
                    columns_list,
                    s3_bucket_name,
                    s3_key,
                    region_name,
                    i,
                    total,
                )
            )

        # 全スレッドの結果を監視
        for f in as_completed(futures):
            # いずれかのスレッドでエラーがあると、ここで例外がスローされる
            _ = f.result()

    logging.info("All imports finished.")
    # ---- (省略) ----
  • ThreadPoolExecutor の活用:分割されたS3ファイルを複数のワーカーで同時にインポートすることで、Auroraのリソース(CPU/IO)をフル活用します。
  • as_completed によるエラー検知:いずれかのファイルでエラーが出た際、即座にメインプロセスで検知してジョブを止めることができ、不完全な同期を防げます。

4. 後処理:一時テーブルにPKを付与し、本番テーブルにスワップ

全てのインポートが完了したら、最後にテーブルを仕上げて本番環境へ反映させます。

-- 1. 投入完了後に一括でプライマリキーを付与
ALTER TABLE staging_table ADD PRIMARY KEY (col1, col2);

-- 2. 統計情報を最新化
ANALYZE staging_table;

-- 3. autovacuum設定を戻す
ALTER TABLE staging_table RESET (autovacuum_enabled);

-- 4. 本番テーブルとスワップ(入れ替え)
BEGIN;
DROP TABLE production_table;
ALTER TABLE staging_table RENAME TO production_table;
COMMIT;

インポートが終わってから ADD PRIMARY KEY を行うことで、データの流し込み時間を最短化できます。また、最後に ANALYZE を実行して統計情報を最新化しておくことで、入れ替え直後からクエリパフォーマンスを最適化できます。

実施結果

7~8億レコードという膨大なデータ同期を、約1時間で完了することができました。
また、データ量が数千万レコード程度であれば5分以内で終えることも可能です。

  1. RedshiftのUNLOAD処理:約40分
  2. 高速書き込み用の一時テーブル作成:数ミリ秒
  3. Aurora並列インポート:約10分
  4. 後処理(PK付与・スワップ):約10分(主にPK付与の時間。スワップは数ミリ秒)

おわりに

大量のデータをRedshiftからAuroraへ移行する場合、データの通り道を「S3」に集約し、DBエンジンの機能を並列で叩くのが効率的です。

また、一時テーブル(Staging Table)でインポートやPK付与をすべて完結させてから最後にスワップする方式をとることで、運用上の安全性も高まります。万が一、インポート途中にエラーが発生しても、最後のスワップ処理を実行されなければ本番テーブルには一切影響がありません。「壊れたデータを本番に入れてしまった」という事故を防ぎつつ、最小のダウンタイムで同期できるのがこの手法の強みです。

同様のデータ同期パイプラインを検討されている方の参考になれば幸いです。