はじめに
EC2 インスタンスの情報収集を題材に Agents for Amazon Bedrock を触っていますが、今回は Slack でやりとりできるように Bot 化してみました。リポジトリはこちらです。これまでの経緯は過去記事をご覧ください。
- Agents for Amazon Bedrock にインスタンスの情報を収集させる
- Agents for Amazon Bedrock で単一のアクショングループから複数の機能を呼び分ける
- Agents for Amazon Bedrock にパラメーターを配列で渡したい
- Agents for Amazon Bedrock から呼び出す Lambda 関数を Go で書き直して高速化してみた
なお私は Slackbot を開発する上で注意すべき以下のポイントをすべて踏み抜きました。
- Bot が自分の投稿に反応して無限ループ
- Events API のリトライ仕様にハマる
- 特定のイベントを無視する条件を入れ忘れ同じ内容が何度も投稿される
構成
以下のような構成です。SQS を挟んで非同期にしてみました。
- ユーザーが Slackbot にメンションまたはダイレクトメッセージを送信する
- API Gateway でリクエストを受ける (Slack/AWS 間を疎結合にしたいため、カスタムドメインを設定)
- API Gateway の Lambda 関数 (1) がリクエストに対して初期応答「回答を準備中…」を返す
- 同じ関数 (1) がリクエストをキューに入れる
- キューの Lambda 関数 (2) がメッセージから指示を取り出しエージェントを呼び出す
- エージェントが指示に沿ってアクションに紐づいた Lambda 関数 (3) を実行して結果を取得する
- 結果を Slack に送信する
- 不要となった初期応答をキューの Lambda 関数 (2) が削除する
Slackbot
lib/image/slackbot ├── consumer │ └── main.go ├── producer │ └── main.go ├── Dockerfile ├── go.mod ├── go.sum └── utils.go
Slackbot のロジックを担う Lambda 関数です。便宜上、図中 1 の関数を Producer、2 の関数を Consumer と呼ぶことにします。
ファイル | 図中の番号 | やること |
---|---|---|
utils.go |
– | 共通で使う構造体や定数を定義する |
producer/main.go |
1 | 1. リクエストを検証する 2. 初期応答を返す 3. リクエストをキューに入れる |
consumer/main.go |
2 | 1. リクエストをキューから出す 2. エージェントを呼び出す 3. 結果を送信する 4. 初期応答を削除する |
共通定義
構造上、どちらの関数からも使用できる構造体や定数を定義する必要があります。zip 形式の Lambda 関数ではレイヤーを使うのが妥当ですが、今回は Go のコンテナイメージを使用するため内部パッケージを切ってインポートすることでコードの重複を防ぎます。
package slackbot const ( InitialMessage = ":loading2: 回答を準備中..." ContextMessage = ":ballot_box_with_check: スレッド内で会話履歴を保持しますが、一定時間入力がない場合はクリアされます。" ) type QueueMessage struct { ChannelID string TimeStamp string InitialMessageChannelID string InitialMessageTimeStamp string InputText string }
この QueueMessage
をメッセージ受け渡しの器として使います。Consumer 関数で必要になる以下の情報をフィールドとして定義しています。
フィールド | 用途 |
---|---|
ChannelID |
投稿先チャンネルの識別 |
TimeStamp |
スレッドの識別InvokeAgent に渡すセッション IDとしても使用 |
InitialMessageChannelID |
初期応答を削除する時に使う ChannelID |
InitialMessageTimeStamp |
初期応答を削除する時に使う TimeStamp |
InputText |
ユーザーメッセージ |
Producer 関数
タイムアウトのハンドリング
Slack の Events API は公式ドキュメントにある通り、サーバーからの応答が 3 秒以上かかると自動でリトライします。エージェントの呼び出しとは相性が悪いので、今回は図の通り SQS を挟んで非同期にしてみました。Producer が初期応答を返してリトライ条件をパスするため、Consumer はリトライを気にする必要がなくなります。
が、初期応答であっても Lambda のコールドスタート時は以下をすべて考慮する必要があり、この制限に引っかかる可能性は残ります。
- リクエストが到達するまでの時間
- 初期化する時間 (ここがネックになる)
- 処理する時間
- 応答が戻ってくるまでの時間
この対策として、リトライ発動の理由がタイムアウトである場合に限ってそのリトライをスキップします。具体的にはリクエストに x-slack-retry-reason
ヘッダーがあり、かつ値が http_timeout
の場合はその時点で 200 OK を返します。
if reason, ok := req.Headers["x-slack-retry-reason"]; ok && reason == "http_timeout" { return doOK("ok", "info: skip retrying due to http_timeout"), nil }
doOK
は 200 を返しつつ CloudWatch Logs に書き込むためのヘルパー関数です。
func doOK(body, msg string) *events.APIGatewayProxyResponse { fmt.Println(msg) return &events.APIGatewayProxyResponse{ Headers: map[string]string{"Content-Type": "text"}, Body: body, StatusCode: http.StatusOK, } }
リクエスト検証
リクエストを受けたらまずその正当性を検証します。以下の流れです。
- リクエストヘッダーを
http.Header
型に変換する x-slack-signature
ヘッダーにはリクエストボディをSLACK_SIGNING_SECRET
で HMAC SHA256 ハッシュした値が入っている- リクエストヘッダーと
SLACK_SIGNING_SECRET
を渡してslack.SecretVerifier
オブジェクトを作る Write
でslack.SecretVerifier
にリクエストボディを書き込むことでリクエストの内容がハッシュ化されるEnsure
でx-slack-signature
ヘッダーの値と計算したハッシュ値を比較する
func doSecretsVerification(req events.APIGatewayProxyRequest) error { header := http.Header{} for k, v := range req.Headers { header.Set(k, v) } sv, err := slack.NewSecretsVerifier(header, envs["SLACK_SIGNING_SECRET"]) if err != nil { return err } if _, err := sv.Write([]byte(req.Body)); err != nil { return err } if err := sv.Ensure(); err != nil { return err } fmt.Println("success: verify signature") return nil }
イベントのパース
slackevents.ParseEvent()
でイベントをパースして Go で扱えるようにします。この時点で Verification Token を使った検証もできますが、現在では非推奨となっているので slackevents.OptionNoVerifyToken()
を渡してスキップします。
body := req.Body event, err := slackevents.ParseEvent(json.RawMessage(body), slackevents.OptionNoVerifyToken()) if err != nil { return nil, err }
URL 検証
本記事では Slack API 側の設定を省略していますが、Slack の Events API を有効化するにはイベントサブスクリプションに URL を登録する必要があります。その際 URL を検証するために、Slack から送られてくるリクエストに対して以下の処理を行う必要があります。
- リクエストから
challenge
属性値を取り出す - その
challenge
属性値を使ってレスポンスを 200 OK で返す
if event.Type == slackevents.URLVerification { var r *slackevents.ChallengeResponse if err := json.Unmarshal([]byte(body), &r); err != nil { return nil, err } return doOK(r.Challenge, "success: verify url"), nil }
コールバックイベントの振り分け
今回は Bot にメンションした場合と Bot にダイレクトメッセージを投稿した場合に動作させる設計です。以下ではコールバックイベントの中身を型スイッチで調べ、メンションの場合とメッセージの場合で処理を分岐させています。
if event.Type == slackevents.CallbackEvent { switch v := event.InnerEvent.Data.(type) { case *slackevents.AppMentionEvent: if err := doAppMentionEvent(v); err != nil { return nil, err } case *slackevents.MessageEvent: if err := doMessageEvent(v); err != nil { return nil, err } default: } }
メンションの処理
各イベントの処理ではのちほど説明する doSend
関数にチャンネル ID、タイムスタンプ、ユーザーメッセージを渡します。とりわけタイムスタンプが重要で、スレッドを特定する識別子として使います。
まず ThreadTimeStamp
が空かどうかを確認します。空の場合、それがスレッド化されていないメッセージ、つまりユーザーからの最初のメッセージと判断できるため、変数 ts
にメッセージ自体の識別子である TimeStamp
を設定します。
他方 ThreadTimeStamp
が空でない場合はスレッドにぶら下がったメッセージと判断できるので、変数 ts
にスレッドの識別子である ThreadTimeStamp
を設定します。
こうすることで ts
にはスレッドの元になった最初のユーザーメッセージの識別子が常に格納され、チャンネル ID + タイムスタンプでスレッドを一意に識別できるようになります。
func doAppMentionEvent(event *slackevents.AppMentionEvent) error { ts := event.ThreadTimeStamp if ts == "" { ts = event.TimeStamp } return doSend(event.Channel, ts, reply.ReplaceAllString(event.Text, "")) }
また event.Text
はメンションの場合、<@ID> message
というようにメンション先の ID がプレフィックスとして付与されます。このプレフィックスは不要なので正規表現でマッチさせて ReplaceAllString()
で剥ぎ取ります。
なお正規表現のコンパイルはコストが高いため、グローバルに一度だけ実行しています。
var reply = regexp.MustCompile(`<@[A-Za-z0-9\.-_]*>\s`)
ダイレクトメッセージの処理
イベントを無視するロジックを含む点でメンションと異なります。
func doMessageEvent(event *slackevents.MessageEvent) error { if event.ChannelType != slack.TYPE_IM || event.BotID != "" || event.SubType == slack.MsgSubTypeMessageChanged || event.SubType == slack.MsgSubTypeMessageDeleted { fmt.Println("info: skip non-covered event") return nil } ts := event.ThreadTimeStamp if ts == "" { ts = event.TimeStamp } return doSend(event.Channel, ts, event.Text) }
メッセージイベントを扱う場合、以下のように注意が必要でした。無限ループ、メッセージの重複など、怪しい挙動はほぼこの部分で踏みました。。。
無視する条件 | 意味 | 条件を入れない場合のリスク |
---|---|---|
event.ChannelType != slack.TYPE_IM |
チャンネルがダイレクトメッセージではない場合 | ダイレクトメッセージではない場合でも勝手に Bot が反応してしまう |
event.BotID != "" |
Bot からのメッセージの場合 | Bot が自分のメッセージに反応することで無限ループが発生してしまう |
event.SubType == slack.MsgSubTypeMessageChanged |
変更イベントの場合 | メッセージがスレッド化されたタイミングで、そのイベントをトリガーとして再処理されてしまう |
event.SubType == slack.MsgSubTypeMessageDeleted |
削除イベントの場合 | メッセージが削除されたタイミングで、そのイベントをトリガーとして再処理されてしまう |
デバッグ時、Lambda 関数のマネコン画面ですぐにスロットリングできるように構えておきましょう。無限ループを堰き止めることができます。
初期応答とキューへの送信
コアロジックです。初期応答「回答を準備中…」の投稿と、SQS キューへのメッセージ送信を行います。
func doSend(channelID, timestamp, text string) error { // ack opts := []slack.MsgOption{ slack.MsgOptionText(messages.InitialMessage, false), slack.MsgOptionTS(timestamp), } id, ts, err := wr.slackClient.PostMessage(channelID, opts...) if err != nil { return err } fmt.Println("success: send initial message") // enqueue msg := slackbot.QueueMessage{ ChannelID: channelID, TimeStamp: timestamp, InitialMessageChannelID: id, InitialMessageTimeStamp: ts, InputText: strings.TrimSuffix(text, messages.ContextMessage), } body, err := json.Marshal(msg) if err != nil { return err } in := &sqs.SendMessageInput{ MessageBody: aws.String(string(body)), QueueUrl: aws.String(envs["QUEUE_URL"]), } if _, err := wr.queueClient.SendMessage(wr.ctx, in); err != nil { return err } fmt.Println("success: enqueue message") return nil }
初期応答
slack.MsgOptionTS()
にユーザーメッセージのタイムスタンプを渡すことでスレッド化します。
キューへの送信
肝は slackbot.QueueMessage
に必要な情報を詰めるところです。
ChannelID
+TimeStamp
があればスレッドを一意に識別できる(*slack.Client).PostMessage
の戻り値id
ts
があればその投稿をあとから削除できるInputText
があればユーザーメッセージをエージェントに渡せる
おわりに
思いのほか長くなってしまったので、記事を 3 つに分割しました。
その 1
- 概要
- Slackbot: 共通定義
- Slackbot: Producer
その 2
- Slackbot: Consumer
- AWS CDK によるインフラ構築
その 3
- インストラクションチューニング
- Slackbot 動作確認