はじめに
今回は、2025年7月にサポートが開始されたOpenSearch Ingestion pipelinesのAurora MySQLとの統合でのOpenSearch Serviceへのデータ反映速度を計測したいと思います。
Amazon OpenSearch Ingestion pipelinesとは
Auroraをデータソースとして、OpenSearch ServiceへAurora側のデータの追加、更新、削除などがストリーミングすることができます。
これによって、複雑なパイプラインを自作することなくAuroraをデータソースとしてOpenSearch Serviceとの連携が可能となります。
構築
今回はプライベートのAuroraとVPC内に配置したOpenSearch Serviceを連携したいと思います。
Auroraにデータを入れるのと、反映速度の計測のため別途EC2も作成しています。
※ AuroraとOpenSearch Serviceの構築は割愛します。
DBクラスタのパラメータグループ作成
下記のようにパラメータの設定変更が必要なため、パラメータグループを作成して対象のAuroraに適用します。
aurora_enhanced_binlog=1 binlog_backup=0 binlog_format=ROW binlog_replication_globaldb=0 binlog_row_image=full binlog_row_metadata=full
KMSキーを作成
初回の連携時には、一度Auroraのデータを全てS3にExportし、その後にパイプラインを通じてOpenSearchに初回連携されるため、S3のデータを暗号化するためのKMSキーを作成します。
パイプラインではカスタムキーのみのため、作成は必須になります。
キーポリシーにはのちに作成するパイプラインのIAMロールをPrincipalに設定
{
"Id": "key-consolepolicy-3",
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Enable IAM User Permissions",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::xxxxxxxxx:root"
},
"Action": "kms:*",
"Resource": "*"
},
{
"Sid": "Allow OpenSearch Ingestion Pipeline",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::xxxxxxxxx:role/service-role/OpenSearchIngestion-rds-392ed7"
},
"Action": [
"kms:Encrypt",
"kms:Decrypt",
"kms:ReEncrypt*",
"kms:GenerateDataKey*",
"kms:DescribeKey"
],
"Resource": "*"
}
]
}
S3バケットの作成
Auroraのデータが出力されるバケットになります。
KMSキーを使用したサーバー側の暗号化を行います。
先ほど作成したKMSキーを指定します。
AuroraExport用のロールを作成
AuroraからS3へExportするためのロールを作成します。
信頼されたエンティティは下記です。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Statement1",
"Effect": "Allow",
"Principal": {
"Service": "export.rds.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
権限ポリシーは下記です。
先ほど作成したS3バケットとKMSキーを指定します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowS3Export",
"Effect": "Allow",
"Action": [
"s3:PutObject*",
"s3:ListBucket",
"s3:GetObject*",
"s3:DeleteObject*",
"s3:GetBucketLocation"
],
"Resource": [
"arn:aws:s3:::xxxxxx-export-bucket",
"arn:aws:s3:::xxxxxx-export-bucket/*"
]
},
{
"Sid": "AllowKmsUse",
"Effect": "Allow",
"Action": [
"kms:GenerateDataKey",
"kms:Encrypt",
"kms:Decrypt",
"kms:DescribeKey"
],
"Resource": "arn:aws:kms:ap-northeast-1:xxxxxxxxxxxx:key/xxxxxxxx"
}
]
}
パイプラインの設定
version: '2'
extension:
aws:
secrets:
my-db-secret:
secret_id: >-
{SecretManager(DB認証情報格納)arn}
region: ap-northeast-1
refresh_interval: PT1H
osis_configuration_metadata:
builder_type: visual
rds-93cba2:
source:
rds:
db_identifier: {DBクラスタ名}
engine: aurora-mysql
aws:
region: ap-northeast-1
database: {DB名}
tables:
include: []
exclude: []
s3_bucket: {S3バケット名}
s3_region: ap-northeast-1
export:
kms_key_id: >-
{KMSキーarn}
iam_role_arn: >-
{AuroraExportロールarn}
stream: true
authentication:
username: '${{aws_secrets:my-db-secret:username}}'
password: '${{aws_secrets:my-db-secret:password}}'
processor: []
sink:
- opensearch:
hosts:
- >-
{OpenSearchServiceドメインエンドポイント}
aws:
serverless: false
region: ap-northeast-1
index_type: custom
index: 'index-${getMetadata("table_name")}'
document_id: '${getMetadata("primary_key")}'
action: '${getMetadata("opensearch_action")}'
document_version: '${getMetadata("document_version")}'
document_version_type: external
パイプラインのIAMロールは自動作成で問題ないかと思います。
ただし、パイプライン作成時にVPCアタッチにおいて権限エラーになる場合があります。
その際は、自動作成されたIAMロールのcreateNetWorkInterfaceの箇所にはコンソールで指定したサブネットとセキュリティグループが指定されていると思いますが、これだと現状は失敗することがあるためワイルドカード指定にし、アクションも追加します。
変更前:
{
"Effect": "Allow",
"Action": [
"ec2:CreateNetworkInterface"
],
"Resource": [
"arn:aws:ec2:ap-northeast-1:xxxxxxxxxxxx:subnet/subnet-xxxxxxxxxx",
"arn:aws:ec2:ap-northeast-1:xxxxxxxxxxxx:subnet/subnet-xxxxxxxxxx",
"arn:aws:ec2:ap-northeast-1:xxxxxxxxxxxx:security-group/sg-xxxxxxxxxx"
]
},
変更後:
{
"Effect": "Allow",
"Action": [
"ec2:AttachNetworkInterface",
"ec2:CreateNetworkInterface",
"ec2:CreateNetworkInterfacePermission",
"ec2:DeleteNetworkInterface",
"ec2:DeleteNetworkInterfacePermission",
"ec2:DetachNetworkInterface",
"ec2:DescribeNetworkInterfaces"
],
"Resource": [
"*"
]
},
検証
構築ができたら、Auroraに別途接続し手動でデータ変更を行います。
同時に踏み台EC2に入りそこからOpenSearchへアクセスしてデータ変更を監視します。
監視スクリプト
テナント1と2のそれぞれのUser01とUser02の合計4ユーザーの現在の郵便番号と変更後の郵便番号を設定しています。
変更後の値として設定した郵便番号に切り替わるまでの秒数を1秒ごとに監視してDBの変更がOpenSearch Serviceへ反映されるまでの時間を計測します。
#!/bin/bash
# OpenSearchのエンドポイント
BASE_URL="https://xxxxxx.ap-northeast-1.es.amazonaws.com/index-{テーブル名}/_search"
AUTH="{ユーザー名}:{パスワード}" #OpenSearchサービスへのログイン情報
# フラグの初期化
found_1=false
found_2=false
found_3=false
found_4=false
start_time=$(date +%s)
echo "--- ⏱ 監視スタート (4件の同時変更待ち) ---"
echo "1. Tenant:1 / User:..01 -> zip:6666"
echo "2. Tenant:1 / User:..02 -> zip:7777"
echo "3. Tenant:2 / User:..01 -> zip:8888"
echo "4. Tenant:2 / User:..02 -> zip:9999"
echo "----------------------------------------"
while true; do
current_time=$(date +%s)
elapsed=$(( current_time - start_time ))
# --- 1件目 (Tenant:1, User:01 -> 6666) ---
if [ "$found_1" = false ]; then
count=$(curl -s -X GET -u "$AUTH" "$BASE_URL?q=tenant_id:1+AND+user_id:1+AND+zip_code:6666&pretty" | grep -o '"value" : [0-9]*' | awk '{print $3}')
if [ "$count" -ge 1 ]; then
echo ":white_check_mark: [1] Tenant:1 / Cust:..01 反映完了! ($elapsed 秒)"
found_1=true
fi
fi
# --- 2件目 (Tenant:1, User:02 -> 7777) ---
if [ "$found_2" = false ]; then
count=$(curl -s -X GET -u "$AUTH" "$BASE_URL?q=tenant_id:1+AND+user_id:2+AND+zip_code:7777&pretty" | grep -o '"value" : [0-9]*' | awk '{print $3}')
if [ "$count" -ge 1 ]; then
echo ":white_check_mark: [2] Tenant:1 / User:..02 反映完了! ($elapsed 秒)"
found_2=true
fi
fi
# --- 3件目 (Tenant:2, User:01 -> 8888) ---
if [ "$found_3" = false ]; then
count=$(curl -s -X GET -u "$AUTH" "$BASE_URL?q=tenant_id:2+AND+user_id:1+AND+zip_code:8888&pretty" | grep -o '"value" : [0-9]*' | awk '{print $3}')
if [ "$count" -ge 1 ]; then
echo ":white_check_mark: [3] Tenant:2 / User:..01 反映完了! ($elapsed 秒)"
found_3=true
fi
fi
# --- 4件目 (Tenant:2, User:02 -> 9999) ---
if [ "$found_4" = false ]; then
count=$(curl -s -X GET -u "$AUTH" "$BASE_URL?q=tenant_id:2+AND+user_id:2+AND+zip_code:9999&pretty" | grep -o '"value" : [0-9]*' | awk '{print $3}')
if [ "$count" -ge 1 ]; then
echo ":white_check_mark: [4] Tenant:2 / User:..02 反映完了! ($elapsed 秒)"
found_4=true
fi
fi
# 全て見つかったら終了
if [ "$found_1" = true ] && [ "$found_2" = true ] && [ "$found_3" = true ] && [ "$found_4" = true ]; then
echo "----------------------------------------"
echo ":tada: 全4件の同期が完了しました"
break
fi
sleep 1
done
結果
--- ⏱ 監視スタート (4件の同時変更待ち) --- 1. Tenant:1 / User:..01 -> zip:6666 2. Tenant:1 / User:..02 -> zip:7777 3. Tenant:2 / User:..01 -> zip:8888 4. Tenant:2 / User:..02 -> zip:9999 ---------------------------------------- :white_check_mark: [3] Tenant:2 / User:..01 反映完了! (15 秒) :white_check_mark: [4] Tenant:2 / User:..02 反映完了! (15 秒) :white_check_mark: [1] Tenant:1 / User:..01 反映完了! (16 秒) :white_check_mark: [2] Tenant:1 / User:..02 反映完了! (16 秒) ---------------------------------------- :tada: 全4件の同期が完了しました
15秒と16秒と結果が出ました。
ただ、今回1秒ごとの監視なのでこの1秒差は監視タイミングのラグの可能性もあります。
その他何度か1件や2件で実施しましたがどの件数でも15秒〜16秒での更新完了でした。
さいごに
構築は少しめんどくさいですが、Kinesisなど他のサービスを組み合わせて自前でパイプラインを作成し保守していくことを考えると、かなり嬉しいサービスかつデータの反映時間も大体のユースケースでは許容できる問題ない速度かと思いました。
ただパイプラインは立ち上げているだけでもコストがかなり高いので、そこはネックになるかと思います。
費用より運用や保守の工数を下げたい場合は選択肢になると思います。