はじめに

EC2 インスタンスの情報収集を題材に Agents for Amazon Bedrock を触っていますが、今回は Slack でやりとりできるように Bot 化してみました。リポジトリはこちらです。これまでの経緯は過去記事をご覧ください。

なお私は Slackbot を開発する上で注意すべき以下のポイントをすべて踏み抜きました。

  • Bot が自分の投稿に反応して無限ループ
  • Events API のリトライ仕様にハマる
  • 特定のイベントを無視する条件を入れ忘れ同じ内容が何度も投稿される

構成

以下のような構成です。SQS を挟んで非同期にしてみました。

108124_diagram

  • ユーザーが Slackbot にメンションまたはダイレクトメッセージを送信する
  • API Gateway でリクエストを受ける (Slack/AWS 間を疎結合にしたいため、カスタムドメインを設定)
  • API Gateway の Lambda 関数 (1) がリクエストに対して初期応答「回答を準備中…」を返す
  • 同じ関数 (1) がリクエストをキューに入れる
  • キューの Lambda 関数 (2) がメッセージから指示を取り出しエージェントを呼び出す
  • エージェントが指示に沿ってアクションに紐づいた Lambda 関数 (3) を実行して結果を取得する
  • 結果を Slack に送信する
  • 不要となった初期応答をキューの Lambda 関数 (2) が削除する

Slackbot

1
2
3
4
5
6
7
8
9
lib/image/slackbot
├── consumer
│   └── main.go
├── producer
│   └── main.go
├── Dockerfile
├── go.mod
├── go.sum
└── utils.go

Slackbot のロジックを担う Lambda 関数です。便宜上、図中 1 の関数を Producer、2 の関数を Consumer と呼ぶことにします。

ファイル 図中の番号 やること
utils.go 共通で使う構造体や定数を定義する
producer/main.go 1 1. リクエストを検証する
2. 初期応答を返す
3. リクエストをキューに入れる
consumer/main.go 2 1. リクエストをキューから出す
2. エージェントを呼び出す
3. 結果を送信する
4. 初期応答を削除する

共通定義

構造上、どちらの関数からも使用できる構造体や定数を定義する必要があります。zip 形式の Lambda 関数ではレイヤーを使うのが妥当ですが、今回は Go のコンテナイメージを使用するため内部パッケージを切ってインポートすることでコードの重複を防ぎます。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
package slackbot
 
const (
    InitialMessage = ":loading2: 回答を準備中..."
    ContextMessage = ":ballot_box_with_check: スレッド内で会話履歴を保持しますが、一定時間入力がない場合はクリアされます。"
)
 
type QueueMessage struct {
    ChannelID               string
    TimeStamp               string
    InitialMessageChannelID string
    InitialMessageTimeStamp string
    InputText               string
}

この QueueMessage をメッセージ受け渡しの器として使います。Consumer 関数で必要になる以下の情報をフィールドとして定義しています。

フィールド 用途
ChannelID 投稿先チャンネルの識別
TimeStamp スレッドの識別
InvokeAgent に渡すセッション IDとしても使用
InitialMessageChannelID 初期応答を削除する時に使う ChannelID
InitialMessageTimeStamp 初期応答を削除する時に使う TimeStamp
InputText ユーザーメッセージ

Producer 関数

全体像

タイムアウトのハンドリング

Slack の Events API は公式ドキュメントにある通り、サーバーからの応答が 3 秒以上かかると自動でリトライします。エージェントの呼び出しとは相性が悪いので、今回は図の通り SQS を挟んで非同期にしてみました。Producer が初期応答を返してリトライ条件をパスするため、Consumer はリトライを気にする必要がなくなります。

が、初期応答であっても Lambda のコールドスタート時は以下をすべて考慮する必要があり、この制限に引っかかる可能性は残ります。

  • リクエストが到達するまでの時間
  • 初期化する時間 (ここがネックになる)
  • 処理する時間
  • 応答が戻ってくるまでの時間

この対策として、リトライ発動の理由がタイムアウトである場合に限ってそのリトライをスキップします。具体的にはリクエストに x-slack-retry-reason ヘッダーがあり、かつ値が http_timeout の場合はその時点で 200 OK を返します。

1
2
3
if reason, ok := req.Headers["x-slack-retry-reason"]; ok && reason == "http_timeout" {
    return doOK("ok", "info: skip retrying due to http_timeout"), nil
}

doOK は 200 を返しつつ CloudWatch Logs に書き込むためのヘルパー関数です。

1
2
3
4
5
6
7
8
func doOK(body, msg string) *events.APIGatewayProxyResponse {
    fmt.Println(msg)
    return &events.APIGatewayProxyResponse{
        Headers:    map[string]string{"Content-Type": "text"},
        Body:       body,
        StatusCode: http.StatusOK,
    }
}

リクエスト検証

リクエストを受けたらまずその正当性を検証します。以下の流れです。

  • リクエストヘッダーを http.Header 型に変換する
  • x-slack-signature ヘッダーにはリクエストボディを SLACK_SIGNING_SECRET で HMAC SHA256 ハッシュした値が入っている
  • リクエストヘッダーと SLACK_SIGNING_SECRET を渡して slack.SecretVerifier オブジェクトを作る
  • Writeslack.SecretVerifier にリクエストボディを書き込むことでリクエストの内容がハッシュ化される
  • Ensurex-slack-signature ヘッダーの値と計算したハッシュ値を比較する
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
func doSecretsVerification(req events.APIGatewayProxyRequest) error {
    header := http.Header{}
    for k, v := range req.Headers {
        header.Set(k, v)
    }
 
    sv, err := slack.NewSecretsVerifier(header, envs["SLACK_SIGNING_SECRET"])
    if err != nil {
        return err
    }
 
    if _, err := sv.Write([]byte(req.Body)); err != nil {
        return err
    }
 
    if err := sv.Ensure(); err != nil {
        return err
    }
 
    fmt.Println("success: verify signature")
    return nil
}

イベントのパース

slackevents.ParseEvent() でイベントをパースして Go で扱えるようにします。この時点で Verification Token を使った検証もできますが、現在では非推奨となっているので slackevents.OptionNoVerifyToken() を渡してスキップします。

1
2
3
4
5
body := req.Body
event, err := slackevents.ParseEvent(json.RawMessage(body), slackevents.OptionNoVerifyToken())
if err != nil {
    return nil, err
}

URL 検証

本記事では Slack API 側の設定を省略していますが、Slack の Events API を有効化するにはイベントサブスクリプションに URL を登録する必要があります。その際 URL を検証するために、Slack から送られてくるリクエストに対して以下の処理を行う必要があります。

  • リクエストから challenge 属性値を取り出す
  • その challenge 属性値を使ってレスポンスを 200 OK で返す
1
2
3
4
5
6
7
if event.Type == slackevents.URLVerification {
    var r *slackevents.ChallengeResponse
    if err := json.Unmarshal([]byte(body), &r); err != nil {
        return nil, err
    }
    return doOK(r.Challenge, "success: verify url"), nil
}

コールバックイベントの振り分け

今回は Bot にメンションした場合と Bot にダイレクトメッセージを投稿した場合に動作させる設計です。以下ではコールバックイベントの中身を型スイッチで調べ、メンションの場合とメッセージの場合で処理を分岐させています。

01
02
03
04
05
06
07
08
09
10
11
12
13
if event.Type == slackevents.CallbackEvent {
    switch v := event.InnerEvent.Data.(type) {
    case *slackevents.AppMentionEvent:
        if err := doAppMentionEvent(v); err != nil {
            return nil, err
        }
    case *slackevents.MessageEvent:
        if err := doMessageEvent(v); err != nil {
            return nil, err
        }
    default:
    }
}

メンションの処理

各イベントの処理ではのちほど説明する doSend 関数にチャンネル ID、タイムスタンプ、ユーザーメッセージを渡します。とりわけタイムスタンプが重要で、スレッドを特定する識別子として使います。

まず ThreadTimeStamp が空かどうかを確認します。空の場合、それがスレッド化されていないメッセージ、つまりユーザーからの最初のメッセージと判断できるため、変数 ts にメッセージ自体の識別子である TimeStamp を設定します。

他方 ThreadTimeStamp が空でない場合はスレッドにぶら下がったメッセージと判断できるので、変数 ts にスレッドの識別子である ThreadTimeStamp を設定します。

こうすることで ts にはスレッドの元になった最初のユーザーメッセージの識別子が常に格納され、チャンネル ID + タイムスタンプでスレッドを一意に識別できるようになります。

1
2
3
4
5
6
7
func doAppMentionEvent(event *slackevents.AppMentionEvent) error {
    ts := event.ThreadTimeStamp
    if ts == "" {
        ts = event.TimeStamp
    }
    return doSend(event.Channel, ts, reply.ReplaceAllString(event.Text, ""))
}

また event.Text はメンションの場合、<@ID> message というようにメンション先の ID がプレフィックスとして付与されます。このプレフィックスは不要なので正規表現でマッチさせて ReplaceAllString() で剥ぎ取ります。

なお正規表現のコンパイルはコストが高いため、グローバルに一度だけ実行しています。

1
var reply = regexp.MustCompile(`<@[A-Za-z0-9\.-_]*>\s`)

ダイレクトメッセージの処理

イベントを無視するロジックを含む点でメンションと異なります。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
func doMessageEvent(event *slackevents.MessageEvent) error {
    if event.ChannelType != slack.TYPE_IM ||
        event.BotID != "" ||
        event.SubType == slack.MsgSubTypeMessageChanged ||
        event.SubType == slack.MsgSubTypeMessageDeleted {
        fmt.Println("info: skip non-covered event")
        return nil
    }
 
    ts := event.ThreadTimeStamp
    if ts == "" {
        ts = event.TimeStamp
    }
    return doSend(event.Channel, ts, event.Text)
}

メッセージイベントを扱う場合、以下のように注意が必要でした。無限ループ、メッセージの重複など、怪しい挙動はほぼこの部分で踏みました。。。

無視する条件 意味 条件を入れない場合のリスク
event.ChannelType != slack.TYPE_IM チャンネルがダイレクトメッセージではない場合 ダイレクトメッセージではない場合でも勝手に Bot が反応してしまう
event.BotID != "" Bot からのメッセージの場合 Bot が自分のメッセージに反応することで無限ループが発生してしまう
event.SubType == slack.MsgSubTypeMessageChanged 変更イベントの場合 メッセージがスレッド化されたタイミングで、そのイベントをトリガーとして再処理されてしまう
event.SubType == slack.MsgSubTypeMessageDeleted 削除イベントの場合 メッセージが削除されたタイミングで、そのイベントをトリガーとして再処理されてしまう

デバッグ時、Lambda 関数のマネコン画面ですぐにスロットリングできるように構えておきましょう。無限ループを堰き止めることができます。

108124_throttle

初期応答とキューへの送信

コアロジックです。初期応答「回答を準備中…」の投稿と、SQS キューへのメッセージ送信を行います。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func doSend(channelID, timestamp, text string) error {
    // ack
    opts := []slack.MsgOption{
        slack.MsgOptionText(messages.InitialMessage, false),
        slack.MsgOptionTS(timestamp),
    }
    id, ts, err := wr.slackClient.PostMessage(channelID, opts...)
    if err != nil {
        return err
    }
    fmt.Println("success: send initial message")
 
    // enqueue
    msg := slackbot.QueueMessage{
        ChannelID:               channelID,
        TimeStamp:               timestamp,
        InitialMessageChannelID: id,
        InitialMessageTimeStamp: ts,
        InputText:               strings.TrimSuffix(text, messages.ContextMessage),
    }
    body, err := json.Marshal(msg)
    if err != nil {
        return err
    }
    in := &sqs.SendMessageInput{
        MessageBody: aws.String(string(body)),
        QueueUrl:    aws.String(envs["QUEUE_URL"]),
    }
    if _, err := wr.queueClient.SendMessage(wr.ctx, in); err != nil {
        return err
    }
    fmt.Println("success: enqueue message")
 
    return nil
}

初期応答

slack.MsgOptionTS() にユーザーメッセージのタイムスタンプを渡すことでスレッド化します。

キューへの送信

肝は slackbot.QueueMessage に必要な情報を詰めるところです。

  • ChannelID + TimeStamp があればスレッドを一意に識別できる
  • (*slack.Client).PostMessage の戻り値 id ts があればその投稿をあとから削除できる
  • InputText があればユーザーメッセージをエージェントに渡せる

おわりに

思いのほか長くなってしまったので、記事を 3 つに分割しました。

その 1

  • 概要
  • Slackbot: 共通定義
  • Slackbot: Producer

その 2

  • Slackbot: Consumer
  • AWS CDK によるインフラ構築

その 3

  • インストラクションチューニング
  • Slackbot 動作確認