はじめに

前回の続きです。まずこちらを読んでいただければと思います。

概要

構成図を再掲します。

108124_diagram

前回は以下の内容を紹介しました。

  • 概要
  • Slackbot: 共通定義
  • Slackbot: Producer 関数 (図中1)

今回はその続きで、次の内容を取り上げます。

  • Slackbot: Consumer 関数 (図中2)
  • AWS CDK によるインフラ構築

Consumer 関数

全体像

エージェントの呼び出し

こちらの InvokeModel API のサンプルを参考にしました。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func invokeAgent(text, timestamp string) (string, error) {
    in := &bedrockagentruntime.InvokeAgentInput{
        InputText:    aws.String(text),
        AgentId:      aws.String(envs["AGENT_ID"]),
        AgentAliasId: aws.String(envs["AGENT_ALIAS_ID"]),
        EnableTrace:  aws.Bool(false),
        EndSession:   aws.Bool(false),
        SessionId:    aws.String(timestamp),
    }
    out, err := wr.agentClient.InvokeAgent(wr.ctx, in)
    if err != nil {
        return "", err
    }
 
    var b strings.Builder
    var n int
    for event := range out.GetStream().Events() {
        switch v := event.(type) {
        case *types.ResponseStreamMemberChunk:
            b.WriteString(string(v.Value.Bytes))
            n++
        case *types.UnknownUnionMember:
            return "", fmt.Errorf("unknown tag: %s", v.Tag)
        default:
            return "", fmt.Errorf("union is nil or unknown type")
        }
    }
 
    fmt.Printf("success: invoke agent: total chunks received: %d\n", n)
    return b.String(), nil
}

InvokeAgent API はリクエスト間で SessionId が同じ値であれば同じ会話と判断し、履歴を保持します。ここではセッション=スレッドと捉え、 SessionId には前回記事で紹介したようにスレッドのタイムスタンプ値を渡します。同一スレッド内の会話履歴を保持し、文脈に沿った回答ができるようになります。

また、コード中の out.GetStream().Events() ですが、AWS SDK for Go v2 での InvokeAgent API のレスポンスはチャネルで逐次的に受信する設計になっています。ただし Issue にある通り、現時点では InvokeAgent API 自体が常にひとつのチャンクしか返さないようです。つまり、ストリーム志向の API だが現時点ではストリームの機能は提供しないと解釈できます。このような状況を鑑み、上記コードでは各チャンクを strings.Builder で結合して返す体裁にしています。

ハンドラー

以下では SQS キューから取り出したメッセージを Go で扱えるように Unmarshal (デシリアライズ) しています。のちほど触れますが SQS イベントソースのバッチサイズを 1 にしているので、Records の最初の要素だけ取得するようにします。

1
2
3
4
5
var msg slackbot.QueueMessage
body := req.Records[0].Body
if err := json.Unmarshal([]byte(body), &msg); err != nil {
    return err
}

取り出したメッセージからテキストとタイムスタンプ値を渡してエージェントを呼び出します。

1
2
3
4
answer, err := invokeAgent(msg.InputText, msg.TimeStamp)
if err != nil {
    return err
}

エージェントからの回答を Slack に投稿します。せっかくの Bot なので、オプションを少し凝っています。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
opts := []slack.MsgOption{
    slack.MsgOptionBlocks(
        &slack.SectionBlock{
            Type: slack.MBTSection,
            Text: &slack.TextBlockObject{
                Type: slack.MarkdownType,
                Text: answer,
            },
        },
        &slack.DividerBlock{
            Type: slack.MBTDivider,
        },
        &slack.ContextBlock{
            Type: slack.MBTContext,
            ContextElements: slack.ContextElements{
                Elements: []slack.MixedElement{
                    &slack.TextBlockObject{
                        Type: slack.PlainTextType,
                        Text: slackbot.ContextMessage,
                    },
                },
            },
        },
    ),
    slack.MsgOptionTS(msg.TimeStamp),
}
if _, _, err := wr.slackClient.PostMessage(msg.ChannelID, opts...); err != nil {
    return err
}

ContextMessageここで定義していますが、こんな感じの見え方になります。

108466_1

最後に初期応答「回答を準備中…」を削除します。

1
2
3
if _, _, err := wr.slackClient.DeleteMessage(msg.InitialMessageChannelID, msg.InitialMessageTimeStamp); err != nil {
    return err
}

Dockerfile

両方の関数で同じ Dockerfile を使います。ARG NAME が肝です。

01
02
03
04
05
06
07
08
09
10
11
12
FROM golang:1.22.4-alpine3.20 as build
ARG HTTP_PROXY
ARG HTTPS_PROXY
ARG NAME
WORKDIR /slackbot
COPY go.mod go.sum utils.go ./
COPY ${NAME} ./${NAME}/
RUN HTTP_PROXY=${HTTP_PROXY} HTTPS_PROXY=${HTTPS_PROXY} go build -o main ${NAME}/main.go
 
FROM alpine:3.20
COPY --from=build /slackbot/main /main
ENTRYPOINT [ "/main" ]

以下のディレクトリ構成なので、CDK でビルドする際 buildArgsNAMEproducer を渡すか consumer を渡すかで切り替えます。

1
2
3
4
5
6
7
8
9
lib/image/slackbot
├── consumer
│   └── main.go
├── producer
│   └── main.go
├── Dockerfile
├── go.mod
├── go.sum
└── utils.go

余談ですが、Go はバイナリにランタイムも含まれるので、マルチステージビルドと相性がいい点も魅力ですね。OS イメージの所定のディレクトリにバイナリを置くだけで動きます。

CDK

資材は揃ったので、それをデプロイするための環境を整備します。

コード

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
.
├── bin
│   └── main.ts
├── lib
│   ├── constructs
│   │   ├── action.ts
│   │   ├── bedrock.ts
│   │   └── slackbot.ts
│   ├── image
│   │   ├── agent-go
│   │   │   ├── Dockerfile
│   │   │   ├── go.mod
│   │   │   ├── go.sum
│   │   │   └── main.go
│   │   └── slackbot
│   │       ├── consumer
│   │       │   └── main.go
│   │       ├── producer
│   │       │   └── main.go
│   │       ├── Dockerfile
│   │       ├── go.mod
│   │       ├── go.sum
│   │       └── utils.go
│   ├── instruction
│   │   └── instruction
│   ├── schema
│   │   └── schema.yaml
│   └── stack.ts
├── test
│   └── cdk-bedrock.test.ts
├── LICENSE
├── README.md
├── cdk-bedrock.code-workspace
├── cdk.context.json
├── cdk.json
├── jest.config.js
├── package-lock.json
├── package.json
├── parameter.template.ts
├── parameter.ts
└── tsconfig.json

SQS キュー

メッセージを受け渡すキューを作ります。特に変わったことはしていません。

1
2
3
4
5
6
7
8
9
const queue = new cdk.aws_sqs.Queue(this, "Queue", {
  queueName: `${props.serviceName}-slackbot-queue`,
  encryption: cdk.aws_sqs.QueueEncryption.SQS_MANAGED,
  enforceSSL: true,
  removalPolicy: cdk.RemovalPolicy.DESTROY,
  retentionPeriod: cdk.Duration.days(7),
  visibilityTimeout: cdk.Duration.minutes(5),
  receiveMessageWaitTime: cdk.Duration.seconds(10),
});

Producer 関数

以下の点が特に重要です。

  • buildArgsNAME"producer" を渡すことで Dockerfile の ARG NAME に適用
  • queue.grantSendMessages() でキューにメッセージを送信する権限を IAM ロールに追加する
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
const producerRole = new cdk.aws_iam.Role(this, "ProducerRole", {
  roleName: `${props.serviceName}-slackbot-producer-role`,
  assumedBy: new cdk.aws_iam.ServicePrincipal("lambda.amazonaws.com"),
  inlinePolicies: {
    SlackBotProducerRoleAdditionalPolicy: new cdk.aws_iam.PolicyDocument({
      statements: [
        new cdk.aws_iam.PolicyStatement({
          effect: cdk.aws_iam.Effect.ALLOW,
          actions: ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"],
          resources: ["arn:aws:logs:*:*:*"],
        }),
      ],
    }),
  },
});
 
const producerFn = new cdk.aws_lambda.DockerImageFunction(this, "ProducerFunction", {
  functionName: `${props.serviceName}-slackbot-producer`,
  description: `${props.serviceName}-slackbot-producer`,
  code: cdk.aws_lambda.DockerImageCode.fromImageAsset("lib/image/slackbot", {
    buildArgs: {
      NAME: "producer",
      HTTP_PROXY: props.httpProxy,
      HTTPS_PROXY: props.httpProxy,
    },
  }),
  architecture: cdk.aws_lambda.Architecture.ARM_64,
  role: producerRole,
  logRetention: cdk.aws_logs.RetentionDays.THREE_DAYS,
  currentVersionOptions: {
    removalPolicy: cdk.RemovalPolicy.RETAIN,
  },
  timeout: cdk.Duration.seconds(3),
  environment: {
    QUEUE_URL: queue.queueUrl,
    SLACK_OAUTH_TOKEN: props.slackOAuthToken,
    SLACK_SIGNING_SECRET: props.slackSigningSecret,
  },
});
 
const producerAlias = new cdk.aws_lambda.Alias(this, "ProducerAlias", {
  aliasName: "live",
  version: producerFn.currentVersion,
});
queue.grantSendMessages(producerAlias);

タイムアウト値については、3 秒以内で処理が完了した場合でも行き戻りの時間を加味した場合 Slack 側ではリトライ対象になったりしますが、そこはあまりこだわらずに 3 秒としています。

Consumer 関数

Consumer 側はタイムアウト値を大きめにしておきます。

  • buildArgsName"consumer" を渡すことで Dockerfile の ARG NAME に適用
  • エージェントを呼び出すので、Lambda のタイムアウトは余裕のある値を設定
  • consumerAlias.addEventSource() で Lambda 関数のイベントソースに SQS キューを設定
  • バッチサイズは今回 1 に設定
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
const consumerFn = new cdk.aws_lambda.DockerImageFunction(this, "ConsumerFunction", {
  functionName: `${props.serviceName}-slackbot-consumer`,
  description: `${props.serviceName}-slackbot-consumer`,
  code: cdk.aws_lambda.DockerImageCode.fromImageAsset("lib/image/slackbot", {
    buildArgs: {
      NAME: "consumer",
      HTTP_PROXY: props.httpProxy,
      HTTPS_PROXY: props.httpProxy,
    },
  }),
  architecture: cdk.aws_lambda.Architecture.ARM_64,
  role: consumerRole,
  logRetention: cdk.aws_logs.RetentionDays.THREE_DAYS,
  currentVersionOptions: {
    removalPolicy: cdk.RemovalPolicy.RETAIN,
  },
  timeout: cdk.Duration.minutes(5),
  environment: {
    AGENT_ID: props.agent.agentId,
    AGENT_ALIAS_ID: props.agent.aliasId!,
    QUEUE_URL: queue.queueUrl,
    SLACK_OAUTH_TOKEN: props.slackOAuthToken,
    SLACK_SIGNING_SECRET: props.slackSigningSecret,
  },
});
 
const consumerAlias = new cdk.aws_lambda.Alias(this, "ConsumerAlias", {
  aliasName: "live",
  version: consumerFn.currentVersion,
});
queue.grantConsumeMessages(consumerAlias);
 
consumerAlias.addEventSource(
  new cdk.aws_lambda_event_sources.SqsEventSource(queue, {
    batchSize: 1,
    maxConcurrency: 2,
  })
);

ACM 証明書

API Gateway にカスタムドメインを設定するため、証明書を作ります。

なぜカスタムドメインを設定するかというと、Slack API 側でのエンドポイント URL の設定を固定にしたいからです。Slack/AWS 間がより疎結合となり、トライアンドエラーしやすくなります。

01
02
03
04
05
06
07
08
09
10
const hostedZone = cdk.aws_route53.HostedZone.fromLookup(this, "HostedZone", {
  domainName: props.hostZoneName,
});
 
const certificate = new cdk.aws_certificatemanager.Certificate(this, "Certificate", {
  certificateName: `${props.serviceName}-cert`,
  domainName: props.domainName,
  subjectAlternativeNames: ["*." + props.domainName],
  validation: cdk.aws_certificatemanager.CertificateValidation.fromDns(hostedZone),
});

API Gateway

API Gateway (HTTP API) を作成し、エンドポイント slack/callback を生やします。カスタムドメインを使うので、デフォルトのエンドポイントは無効にしています。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
const domainName = new cdk.aws_apigatewayv2.DomainName(this, "Domain", {
  domainName: props.domainName,
  certificate: certificate,
});
 
const apiName = `${props.serviceName}-slackbot-gateway`;
const api = new cdk.aws_apigatewayv2.HttpApi(this, "API", {
  apiName: apiName,
  description: apiName,
  disableExecuteApiEndpoint: true,
  defaultDomainMapping: {
    domainName: domainName,
  },
});
 
api.addRoutes({
  methods: [cdk.aws_apigatewayv2.HttpMethod.ANY],
  path: "/slack/callback",
  integration: new cdk.aws_apigatewayv2_integrations.HttpLambdaIntegration("Callback", producerAlias),
});

ログを有効にしますが、そのためには現状エスケープハッチが必要です (参考)

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
const logGroup = new cdk.aws_logs.LogGroup(this, "LogGroup", {
  logGroupName: `/aws/apigateway/${apiName}`,
  removalPolicy: cdk.RemovalPolicy.DESTROY,
  retention: cdk.aws_logs.RetentionDays.THREE_DAYS,
});
 
const defaultStage = api.defaultStage?.node.defaultChild as cdk.aws_apigatewayv2.CfnStage;
defaultStage.accessLogSettings = {
  destinationArn: logGroup.logGroupArn,
  format: JSON.stringify({
    requestId: "$context.requestId",
    ip: "$context.identity.sourceIp",
    caller: "$context.identity.caller",
    user: "$context.identity.user",
    requestTime: "$context.requestTime",
    httpMethod: "$context.httpMethod",
    resourcePath: "$context.resourcePath",
    status: "$context.status",
    protocol: "$context.protocol",
    responseLength: "$context.responseLength",
  }),
};

最後にカスタムドメインを Route 53 に登録します。これで、いくらスタックを作り直しても Slack API 側の URL 設定を変更する必要がなくなりました。

01
02
03
04
05
06
07
08
09
10
11
const aRecord = new cdk.aws_route53.ARecord(this, "ARecord", {
  recordName: props.domainName,
  target: cdk.aws_route53.RecordTarget.fromAlias(
    new cdk.aws_route53_targets.ApiGatewayv2DomainProperties(
      domainName.regionalDomainName,
      domainName.regionalHostedZoneId
    )
  ),
  zone: hostedZone,
});
aRecord.node.addDependency(api);

おわりに

次回はインストラクションのチューニングを紹介し、実際に Slackbot の動作確認してみます。