開発チームがお届けするブログリレーです!既に公開されている記事もありますので、こちらから他のメンバーの投稿もぜひチェックしてみてください!
AWS Amplify(以下、Amplify)では、データベースとしてAmazon DynamoDB(以下、DynamoDB)やAmazon RDSの他に、Amazon OpenSearch Service(以下、OpenSearch)もAmplifyを使用して構築することができます。
この機能はAmplify Gen1から存在していましたが、Gen2ではアーキテクチャと作成方法が大幅に変更されました。
Amazon OpenSearch Service について
Amazon OpenSearch Serviceは、Apache Luceneをベースにしたオープンソースの全文検索・分析エンジンである「OpenSearch」を、AWS上で簡単にデプロイ、運用、スケールできるようにしたマネージドサービスです。
主な用途としては、以下のようなものが挙げられます。
- 全文検索: Webサイトやアプリケーションに、高速で高機能な検索機能を実装します。
- ログ分析: アプリケーションやサーバーから出力される大量のログデータをリアルタイムで収集・分析し、システムの監視やトラブルシューティングに役立てます。
- リアルタイムモニタリング: アプリケーションのパフォーマンスメトリクスやビジネスKPIを可視化し、状況をリアルタイムで把握します。
AWS Amplifyと連携させることで、AWS AppSyncのバックエンドとしてOpenSearchを利用できるようになり、スキーマに追加するだけで、特定のモデルのデータを自動的にOpenSearchにインデックスし、検索可能にすることができます。これにより、DynamoDBだけでは実現が難しい複雑な検索要件にも柔軟に対応できるアプリケーションを迅速に構築できます。
OpenSearch Ingestion について
Amazon OpenSearch Ingestionは、OpenSearch Serviceへのデータ投入を簡素化するための、サーバーレスなデータコレクターです。様々なソースからデータを収集し、フィルタリング、変換、集約などの処理を行った上で、OpenSearchのドメイン(クラスター)にデータを送信するパイプラインを構築できます。
従来は、LogstashやFluentdといった別のツールをEC2インスタンスなどで自己管理する必要がありましたが、OpenSearch Ingestionを利用することで、インフラのプロビジョニングや管理の手間なく、スケーラブルなデータ収集パイプラインを簡単に構築・運用できます。
Amplify Gen2 では、このOpenSearch Ingestionとの連携がより強化され、バックエンドの定義の中でデータソースや変換ロジックをコードで記述できるようになりました。
Gen1 時代
Gen1の時は、Amplify CLIベースでのリソース作成であったこともあり、Amplifyが提供している @searchable というディレクティブをモデル定義に付与するだけで、なんとDynamoDBと一緒に自動でAmplify側がOpenSearch + DynamoDB Stream用のLambdaなどを作成してくれました。
参考:https://docs.amplify.aws/gen1/react/build-a-backend/graphqlapi/search-and-result-aggregations/
type Chat @model @searchable @auth(rules: [ { allow: owner } ]) {
id: ID!
name: String
message: [Message] @hasMany
}
type Message @model @searchable @auth(rules: [ { allow: owner } ]) {
id: ID!
text: String
chatId: ID! @index
chat: Chat @belongsTo(fields: ["chatId"])
}
中身はAmazon DynamoDBからDynamoDB Streamを使用し、DynamoDB StreamからLambdaへのイベント配信でOpenSearch Serviceへのデータ転送が行われています。

OpenSearchの設定はかなり細かく行う必要があるのですが、Gen1では多くの設定を自動で行ってくれました。
(Gen2では細かく設定が必要になります)
Gen1で作成されるOpenSearchの主な設定値は以下です。
- エンジン: Elasticsearch 7.10
- アベイラビリティゾーン数: 1 AZ
- EBSボリュームタイプ: gp2
- EBSボリュームサイズ: 10
もう少し詳細が知りたい方は、Amplifyのコードを見ると設定値すべてが確認できます。
https://github.com/aws-amplify/amplify-category-api/blob/main/packages/amplify-graphql-searchable-transformer/src/cdk/create-searchable-domain.ts
Gen2 からの変更
構成
Gen2からはCDKでOpenSearch Serviceを作成する方針へ変更になりました。(ドキュメントからそう読み取りました)
また、以下のチュートリアルにあるようにGen1までのDynamoDB StreamだけでのOpenSearchへのデータロードではなく、OpenSearch Ingestionを使用したゼロETL統合が行えるようになりました。
https://docs.amplify.aws/react/build-a-backend/data/custom-business-logic/search-and-aggregate-queries/

https://aws.amazon.com/jp/blogs/mobile/build-a-scalable-search-solution-with-aws-amplify-and-opensearch-serverless/
私はこの構成図を見た時、取り込むためのOpenSearch IngestionにS3とDynamoDB Streamどちらからも線が伸びているのが気になりました。
OpenSearch Ingestion のドキュメントを見ると、以下の記述があります。
完全なスナップショットの場合 – DynamoDB はpoint-in-timeリカバリ (PITR) を使用してバックアップを作成し、Amazon S3 にアップロードします。OpenSearch Ingestion は、スナップショットを 1 つ以上の OpenSearch インデックスにインデックス化します。一貫性を維持するために、パイプラインはすべての DynamoDB の変更を OpenSearch と同期します。
つまりDynamoDBからOpenSearchへ取り込む際に、一番最初のインデックス化にポイントタイムリカバリで作成されたスナップショットを使用し、それ以降の変更分に関してはDynamoDB Streamをキャプチャするような構成です。
これはデータロード初回時に取り込むDynamoDBのデータ量が多すぎると、DynamoDB Streamの制限 (データ保持は最大24時間) によりデータロストが発生する可能性があります。
そのため最初のインデックス化ではスナップショットから作成することが推奨されています。
逆に全量ではなく、DynamoDB Stream有効化以降のデータのみで良いのであれば、このポイントインタイムリカバリの設定は不要です。

作成方法
作成方法もGen1から大きく変化し、 @searchable のディレクティブではなく、CDKを用いて自身でリソースを定義する必要があります。
ドキュメントの作成方法を眺めると流れが掴めます。
https://docs.amplify.aws/react/build-a-backend/data/custom-business-logic/search-and-aggregate-queries/#step-4c-add-the-appsync-resolver-for-the-search-query
以下が、Amplify上でAppSyncのバックエンドリソースとしてOpenSearchと接続するために必要な記述全てです。(ドキュメントから抜粋)
amplify/backend.ts
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";
import * as opensearch from "aws-cdk-lib/aws-opensearchservice";
import { RemovalPolicy } from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import * as osis from "aws-cdk-lib/aws-osis";
import * as logs from "aws-cdk-lib/aws-logs";
import { RemovalPolicy } from "aws-cdk-lib";
import { defineBackend } from "@aws-amplify/backend";
import { auth } from "./auth/resource";
import { data } from "./data/resource";
import { storage } from "./storage/resource";
// Define backend resources
const backend = defineBackend({
auth,
data,
storage,
});
const todoTable =
backend.data.resources.cfnResources.amplifyDynamoDbTables["Todo"];
// Update table settings
todoTable.pointInTimeRecoveryEnabled = true;
todoTable.streamSpecification = {
streamViewType: dynamodb.StreamViewType.NEW_IMAGE,
};
// Get the DynamoDB table ARN
const tableArn = backend.data.resources.tables["Todo"].tableArn;
// Get the DynamoDB table name
const tableName = backend.data.resources.tables["Todo"].tableName;
// Create the OpenSearch domain
const openSearchDomain = new opensearch.Domain(
backend.data.stack,
"OpenSearchDomain",
{
version: opensearch.EngineVersion.OPENSEARCH_2_11,
capacity: {
// upgrade instance types for production use
masterNodeInstanceType: "t3.small.search",
masterNodes: 0,
dataNodeInstanceType: "t3.small.search",
dataNodes: 1,
},
nodeToNodeEncryption: true,
// set removalPolicy to DESTROY to make sure the OpenSearch domain is deleted on stack deletion.
removalPolicy: RemovalPolicy.DESTROY,
encryptionAtRest: {
enabled: true,
},
}
);
// Get the S3Bucket ARN
const s3BucketArn = backend.storage.resources.bucket.bucketArn;
// Get the S3Bucket Name
const s3BucketName = backend.storage.resources.bucket.bucketName;
// Create an IAM role for OpenSearch integration
const openSearchIntegrationPipelineRole = new iam.Role(
backend.data.stack,
"OpenSearchIntegrationPipelineRole",
{
assumedBy: new iam.ServicePrincipal("osis-pipelines.amazonaws.com"),
inlinePolicies: {
openSearchPipelinePolicy: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ["es:DescribeDomain"],
resources: [
openSearchDomain.domainArn,
openSearchDomain.domainArn + "/*",
],
effect: iam.Effect.ALLOW,
}),
new iam.PolicyStatement({
actions: ["es:ESHttp*"],
resources: [
openSearchDomain.domainArn,
openSearchDomain.domainArn + "/*",
],
effect: iam.Effect.ALLOW,
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:PutObjectAcl",
],
resources: [s3BucketArn, s3BucketArn + "/*"],
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"dynamodb:DescribeTable",
"dynamodb:DescribeContinuousBackups",
"dynamodb:ExportTableToPointInTime",
"dynamodb:DescribeExport",
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
],
resources: [tableArn, tableArn + "/*"],
}),
],
}),
},
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName(
"AmazonOpenSearchIngestionFullAccess"
),
],
}
);
// Define OpenSearch index mappings
const indexName = "todo";
const indexMapping = {
settings: {
number_of_shards: 1,
number_of_replicas: 0,
},
mappings: {
properties: {
id: {
type: "keyword",
},
isDone: {
type: "boolean",
},
content: {
type: "text",
},
priority: {
type: "text",
},
},
},
};
// OpenSearch template definition
const openSearchTemplate = `
version: "2"
dynamodb-pipeline:
source:
dynamodb:
acknowledgments: true
tables:
- table_arn: "${tableArn}"
stream:
start_position: "LATEST"
export:
s3_bucket: "${s3BucketName}"
s3_region: "${backend.storage.stack.region}"
s3_prefix: "${tableName}/"
aws:
sts_role_arn: "${openSearchIntegrationPipelineRole.roleArn}"
region: "${backend.data.stack.region}"
sink:
- opensearch:
hosts:
- "https://${openSearchDomain.domainEndpoint}"
index: "${indexName}"
index_type: "custom"
template_content: |
${JSON.stringify(indexMapping)}
document_id: '\${getMetadata("primary_key")}'
action: '\${getMetadata("opensearch_action")}'
document_version: '\${getMetadata("document_version")}'
document_version_type: "external"
bulk_size: 4
aws:
sts_role_arn: "${openSearchIntegrationPipelineRole.roleArn}"
region: "${backend.data.stack.region}"
`;
// Create a CloudWatch log group
const logGroup = new logs.LogGroup(backend.data.stack, "LogGroup", {
logGroupName: "/aws/vendedlogs/OpenSearchService/pipelines/1",
removalPolicy: RemovalPolicy.DESTROY,
});
// Create an OpenSearch Integration Service pipeline
const cfnPipeline = new osis.CfnPipeline(
backend.data.stack,
"OpenSearchIntegrationPipeline",
{
maxUnits: 4,
minUnits: 1,
pipelineConfigurationBody: openSearchTemplate,
pipelineName: "dynamodb-integration-2",
logPublishingOptions: {
isLoggingEnabled: true,
cloudWatchLogDestination: {
logGroup: logGroup.logGroupName,
},
},
}
);
// Add OpenSearch data source
const osDataSource = backend.data.addOpenSearchDataSource(
"osDataSource",
openSearchDomain
);
これに加え、JSリゾルバーの作成やモデル定義を行う必要があり、Gen1に比べて多くの工数がかかります。
ただその反面、OpenSearch Serviceのエンジンやエンジンのバージョン、インスタンスタイプの指定など細かく指定できるようになりました。
それに付随して、Gen1でなんとなくOpenSearch Serviceを運用していたプロジェクトでは、よりOpenSearch Serviceについてしっかり理解する必要があります。どのインスタンスタイプを使用すればコストパフォーマンスが良くなるか、ノード数やシャード数はどうするかなど多くの検討が必要です。
ハマったところ
Amplify Gen 2 でDynamoDB StreamからOpenSearch Serviceへの連携をする上で、ほぼリアルタイム(遅延は1秒以下を許容)でデータロードを実現する必要がありました。
しかしAmplifyのドキュメントに記載された設定値では、最大60秒の遅延が発生していました。
↓Amplifyのチュートリアルで作成したDynamoDBテーブルをOpenSearchと連携し、AppSyncからどちらも取得してみた画像 (listChat=DynamoDB, searchChat=OpenSearch)

この解決策は、OpenSearch側の設定値に flush_timeout を指定する必要がありました。
flush_timeout:
A long class that contains the amount of time, in milliseconds, to try packing a bulk request up to the bulk_size before flushing the request. If this timeout expires before a bulk request has reached the bulk_size, the request will be flushed. Set to -1 to disable the flush timeout and instead flush whatever is present at the end of each batch. Default is 60,000, or 1 minute.
https://docs.opensearch.org/docs/2.18/data-prepper/pipelines/configuration/sinks/opensearch/
この事象からわかることは、OpenSearchの設定値をOpenSearch側のドキュメントを参照して設定する必要があるいうことです。
Gen1の「手軽に使える」という体験からのギャップは大きく、Gen2では作成における学習コストが上がったと感じます。
しかし、必然的にOpenSearchの仕様について知見を深めることになり、開発者として運用面を考慮した健全な開発に繋がるとも言えるかもしれません。
まとめ
Amplify Gen 2では、OpenSearchの連携がより柔軟かつ強力になった一方で、構成や設定の自由度が増した分、求められる知識や実装の難易度も上がりました。
特に初回インデックスにスナップショットを使う構成や、細かいパラメータ調整が必要な点からも、Gen1の「簡単に使える」印象とは大きく異なります。
ただし、その分、プロダクションレベルの検索機能を自身の管理下で詳細に設計・最適化できるようになっており、使いこなせば非常に強力な構成が実現できます。