はじめに
前回の続きです。まずこちらを読んでいただければと思います。
概要
構成図を再掲します。
前回は以下の内容を紹介しました。
- 概要
- Slackbot: 共通定義
- Slackbot: Producer 関数 (図中1)
今回はその続きで、次の内容を取り上げます。
- Slackbot: Consumer 関数 (図中2)
- AWS CDK によるインフラ構築
Consumer 関数
エージェントの呼び出し
こちらの InvokeModel API のサンプルを参考にしました。
func invokeAgent(text, timestamp string) (string, error) { in := &bedrockagentruntime.InvokeAgentInput{ InputText: aws.String(text), AgentId: aws.String(envs["AGENT_ID"]), AgentAliasId: aws.String(envs["AGENT_ALIAS_ID"]), EnableTrace: aws.Bool(false), EndSession: aws.Bool(false), SessionId: aws.String(timestamp), } out, err := wr.agentClient.InvokeAgent(wr.ctx, in) if err != nil { return "", err } var b strings.Builder var n int for event := range out.GetStream().Events() { switch v := event.(type) { case *types.ResponseStreamMemberChunk: b.WriteString(string(v.Value.Bytes)) n++ case *types.UnknownUnionMember: return "", fmt.Errorf("unknown tag: %s", v.Tag) default: return "", fmt.Errorf("union is nil or unknown type") } } fmt.Printf("success: invoke agent: total chunks received: %d\n", n) return b.String(), nil }
InvokeAgent API はリクエスト間で SessionId
が同じ値であれば同じ会話と判断し、履歴を保持します。ここではセッション=スレッドと捉え、 SessionId
には前回記事で紹介したようにスレッドのタイムスタンプ値を渡します。同一スレッド内の会話履歴を保持し、文脈に沿った回答ができるようになります。
また、コード中の out.GetStream().Events()
ですが、AWS SDK for Go v2 での InvokeAgent API のレスポンスはチャネルで逐次的に受信する設計になっています。ただし Issue にある通り、現時点では InvokeAgent API 自体が常にひとつのチャンクしか返さないようです。つまり、ストリーム志向の API だが現時点ではストリームの機能は提供しないと解釈できます。このような状況を鑑み、上記コードでは各チャンクを strings.Builder
で結合して返す体裁にしています。
ハンドラー
以下では SQS キューから取り出したメッセージを Go で扱えるように Unmarshal (デシリアライズ) しています。のちほど触れますが SQS イベントソースのバッチサイズを 1
にしているので、Records
の最初の要素だけ取得するようにします。
var msg slackbot.QueueMessage body := req.Records[0].Body if err := json.Unmarshal([]byte(body), &msg); err != nil { return err }
取り出したメッセージからテキストとタイムスタンプ値を渡してエージェントを呼び出します。
answer, err := invokeAgent(msg.InputText, msg.TimeStamp) if err != nil { return err }
エージェントからの回答を Slack に投稿します。せっかくの Bot なので、オプションを少し凝っています。
opts := []slack.MsgOption{ slack.MsgOptionBlocks( &slack.SectionBlock{ Type: slack.MBTSection, Text: &slack.TextBlockObject{ Type: slack.MarkdownType, Text: answer, }, }, &slack.DividerBlock{ Type: slack.MBTDivider, }, &slack.ContextBlock{ Type: slack.MBTContext, ContextElements: slack.ContextElements{ Elements: []slack.MixedElement{ &slack.TextBlockObject{ Type: slack.PlainTextType, Text: slackbot.ContextMessage, }, }, }, }, ), slack.MsgOptionTS(msg.TimeStamp), } if _, _, err := wr.slackClient.PostMessage(msg.ChannelID, opts...); err != nil { return err }
ContextMessage
はここで定義していますが、こんな感じの見え方になります。
最後に初期応答「回答を準備中…」を削除します。
if _, _, err := wr.slackClient.DeleteMessage(msg.InitialMessageChannelID, msg.InitialMessageTimeStamp); err != nil { return err }
Dockerfile
両方の関数で同じ Dockerfile を使います。ARG NAME
が肝です。
FROM golang:1.22.4-alpine3.20 as build ARG HTTP_PROXY ARG HTTPS_PROXY ARG NAME WORKDIR /slackbot COPY go.mod go.sum utils.go ./ COPY ${NAME} ./${NAME}/ RUN HTTP_PROXY=${HTTP_PROXY} HTTPS_PROXY=${HTTPS_PROXY} go build -o main ${NAME}/main.go FROM alpine:3.20 COPY --from=build /slackbot/main /main ENTRYPOINT [ "/main" ]
以下のディレクトリ構成なので、CDK でビルドする際 buildArgs
で NAME
に producer
を渡すか consumer
を渡すかで切り替えます。
lib/image/slackbot ├── consumer │ └── main.go ├── producer │ └── main.go ├── Dockerfile ├── go.mod ├── go.sum └── utils.go
余談ですが、Go はバイナリにランタイムも含まれるので、マルチステージビルドと相性がいい点も魅力ですね。OS イメージの所定のディレクトリにバイナリを置くだけで動きます。
CDK
資材は揃ったので、それをデプロイするための環境を整備します。
. ├── bin │ └── main.ts ├── lib │ ├── constructs │ │ ├── action.ts │ │ ├── bedrock.ts │ │ └── slackbot.ts │ ├── image │ │ ├── agent-go │ │ │ ├── Dockerfile │ │ │ ├── go.mod │ │ │ ├── go.sum │ │ │ └── main.go │ │ └── slackbot │ │ ├── consumer │ │ │ └── main.go │ │ ├── producer │ │ │ └── main.go │ │ ├── Dockerfile │ │ ├── go.mod │ │ ├── go.sum │ │ └── utils.go │ ├── instruction │ │ └── instruction │ ├── schema │ │ └── schema.yaml │ └── stack.ts ├── test │ └── cdk-bedrock.test.ts ├── LICENSE ├── README.md ├── cdk-bedrock.code-workspace ├── cdk.context.json ├── cdk.json ├── jest.config.js ├── package-lock.json ├── package.json ├── parameter.template.ts ├── parameter.ts └── tsconfig.json
SQS キュー
メッセージを受け渡すキューを作ります。特に変わったことはしていません。
const queue = new cdk.aws_sqs.Queue(this, "Queue", { queueName: `${props.serviceName}-slackbot-queue`, encryption: cdk.aws_sqs.QueueEncryption.SQS_MANAGED, enforceSSL: true, removalPolicy: cdk.RemovalPolicy.DESTROY, retentionPeriod: cdk.Duration.days(7), visibilityTimeout: cdk.Duration.minutes(5), receiveMessageWaitTime: cdk.Duration.seconds(10), });
Producer 関数
以下の点が特に重要です。
buildArgs
のNAME
に"producer"
を渡すことで Dockerfile のARG NAME
に適用queue.grantSendMessages()
でキューにメッセージを送信する権限を IAM ロールに追加する
const producerRole = new cdk.aws_iam.Role(this, "ProducerRole", { roleName: `${props.serviceName}-slackbot-producer-role`, assumedBy: new cdk.aws_iam.ServicePrincipal("lambda.amazonaws.com"), inlinePolicies: { SlackBotProducerRoleAdditionalPolicy: new cdk.aws_iam.PolicyDocument({ statements: [ new cdk.aws_iam.PolicyStatement({ effect: cdk.aws_iam.Effect.ALLOW, actions: ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"], resources: ["arn:aws:logs:*:*:*"], }), ], }), }, }); const producerFn = new cdk.aws_lambda.DockerImageFunction(this, "ProducerFunction", { functionName: `${props.serviceName}-slackbot-producer`, description: `${props.serviceName}-slackbot-producer`, code: cdk.aws_lambda.DockerImageCode.fromImageAsset("lib/image/slackbot", { buildArgs: { NAME: "producer", HTTP_PROXY: props.httpProxy, HTTPS_PROXY: props.httpProxy, }, }), architecture: cdk.aws_lambda.Architecture.ARM_64, role: producerRole, logRetention: cdk.aws_logs.RetentionDays.THREE_DAYS, currentVersionOptions: { removalPolicy: cdk.RemovalPolicy.RETAIN, }, timeout: cdk.Duration.seconds(3), environment: { QUEUE_URL: queue.queueUrl, SLACK_OAUTH_TOKEN: props.slackOAuthToken, SLACK_SIGNING_SECRET: props.slackSigningSecret, }, }); const producerAlias = new cdk.aws_lambda.Alias(this, "ProducerAlias", { aliasName: "live", version: producerFn.currentVersion, }); queue.grantSendMessages(producerAlias);
タイムアウト値については、3 秒以内で処理が完了した場合でも行き戻りの時間を加味した場合 Slack 側ではリトライ対象になったりしますが、そこはあまりこだわらずに 3 秒としています。
Consumer 関数
Consumer 側はタイムアウト値を大きめにしておきます。
buildArgs
のName
に"consumer"
を渡すことで Dockerfile のARG NAME
に適用- エージェントを呼び出すので、Lambda のタイムアウトは余裕のある値を設定
consumerAlias.addEventSource()
で Lambda 関数のイベントソースに SQS キューを設定- バッチサイズは今回
1
に設定
const consumerFn = new cdk.aws_lambda.DockerImageFunction(this, "ConsumerFunction", { functionName: `${props.serviceName}-slackbot-consumer`, description: `${props.serviceName}-slackbot-consumer`, code: cdk.aws_lambda.DockerImageCode.fromImageAsset("lib/image/slackbot", { buildArgs: { NAME: "consumer", HTTP_PROXY: props.httpProxy, HTTPS_PROXY: props.httpProxy, }, }), architecture: cdk.aws_lambda.Architecture.ARM_64, role: consumerRole, logRetention: cdk.aws_logs.RetentionDays.THREE_DAYS, currentVersionOptions: { removalPolicy: cdk.RemovalPolicy.RETAIN, }, timeout: cdk.Duration.minutes(5), environment: { AGENT_ID: props.agent.agentId, AGENT_ALIAS_ID: props.agent.aliasId!, QUEUE_URL: queue.queueUrl, SLACK_OAUTH_TOKEN: props.slackOAuthToken, SLACK_SIGNING_SECRET: props.slackSigningSecret, }, }); const consumerAlias = new cdk.aws_lambda.Alias(this, "ConsumerAlias", { aliasName: "live", version: consumerFn.currentVersion, }); queue.grantConsumeMessages(consumerAlias); consumerAlias.addEventSource( new cdk.aws_lambda_event_sources.SqsEventSource(queue, { batchSize: 1, maxConcurrency: 2, }) );
ACM 証明書
API Gateway にカスタムドメインを設定するため、証明書を作ります。
なぜカスタムドメインを設定するかというと、Slack API 側でのエンドポイント URL の設定を固定にしたいからです。Slack/AWS 間がより疎結合となり、トライアンドエラーしやすくなります。
const hostedZone = cdk.aws_route53.HostedZone.fromLookup(this, "HostedZone", { domainName: props.hostZoneName, }); const certificate = new cdk.aws_certificatemanager.Certificate(this, "Certificate", { certificateName: `${props.serviceName}-cert`, domainName: props.domainName, subjectAlternativeNames: ["*." + props.domainName], validation: cdk.aws_certificatemanager.CertificateValidation.fromDns(hostedZone), });
API Gateway
API Gateway (HTTP API) を作成し、エンドポイント slack/callback
を生やします。カスタムドメインを使うので、デフォルトのエンドポイントは無効にしています。
const domainName = new cdk.aws_apigatewayv2.DomainName(this, "Domain", { domainName: props.domainName, certificate: certificate, }); const apiName = `${props.serviceName}-slackbot-gateway`; const api = new cdk.aws_apigatewayv2.HttpApi(this, "API", { apiName: apiName, description: apiName, disableExecuteApiEndpoint: true, defaultDomainMapping: { domainName: domainName, }, }); api.addRoutes({ methods: [cdk.aws_apigatewayv2.HttpMethod.ANY], path: "/slack/callback", integration: new cdk.aws_apigatewayv2_integrations.HttpLambdaIntegration("Callback", producerAlias), });
ログを有効にしますが、そのためには現状エスケープハッチが必要です (参考)
const logGroup = new cdk.aws_logs.LogGroup(this, "LogGroup", { logGroupName: `/aws/apigateway/${apiName}`, removalPolicy: cdk.RemovalPolicy.DESTROY, retention: cdk.aws_logs.RetentionDays.THREE_DAYS, }); const defaultStage = api.defaultStage?.node.defaultChild as cdk.aws_apigatewayv2.CfnStage; defaultStage.accessLogSettings = { destinationArn: logGroup.logGroupArn, format: JSON.stringify({ requestId: "$context.requestId", ip: "$context.identity.sourceIp", caller: "$context.identity.caller", user: "$context.identity.user", requestTime: "$context.requestTime", httpMethod: "$context.httpMethod", resourcePath: "$context.resourcePath", status: "$context.status", protocol: "$context.protocol", responseLength: "$context.responseLength", }), };
最後にカスタムドメインを Route 53 に登録します。これで、いくらスタックを作り直しても Slack API 側の URL 設定を変更する必要がなくなりました。
const aRecord = new cdk.aws_route53.ARecord(this, "ARecord", { recordName: props.domainName, target: cdk.aws_route53.RecordTarget.fromAlias( new cdk.aws_route53_targets.ApiGatewayv2DomainProperties( domainName.regionalDomainName, domainName.regionalHostedZoneId ) ), zone: hostedZone, }); aRecord.node.addDependency(api);
おわりに
次回はインストラクションのチューニングを紹介し、実際に Slackbot の動作確認してみます。