はじめに

ひきつづき EC2 インスタンスの情報収集を題材に Agents for Amazon Bedrock を触っています。今回はアクショングループに紐づいた Lambda 関数を Python から Go に書き直してパフォーマンスを改善してみました。

以下を試してどの程度パフォーマンスが改善するか確認します。コードはこちらのブランチに置いています。

  • goroutine でリージョンごとに並行処理する
  • 以下の処理を init 関数で実行し、結果を再利用する
    • Config の初期化
    • EC2 クライアントの初期化
    • DescribeRegions API

これまでの経緯は過去記事をご覧ください。

改善後の Lambda 関数

全体像はこちら

.
├── Dockerfile
├── go.mod
├── go.sum
└── main.go

まず各メソッドを生やす構造体を定義し、EC2 のクライアントとリージョンのデフォルト値を保持するようにします。そしてグローバル変数を用意し、init 関数によって値が代入されるようにします。

var d *Describer

type Describer struct {
    ctx     context.Context
    client  *ec2.Client
    regions []string
}

init 関数

Go の init 関数はパッケージ内の定数と変数宣言が評価された後に実行されます。Lambda 関数の場合、ウォームスタート時には init は実行されず、コールドスタート時に実行された結果を再利用します。

これを利用して DescribeRegions を init 関数で行うことで API 実行のオーバーヘッドを軽減します。Config の初期化、EC2 クライアントの初期化も同様にコールドスタート時の実行結果を再利用するようにします。

init 関数は引数を取ることができず戻り値もないので、グローバル変数に結果を代入する必要があります。

func init() {
    d = &Describer{
        ctx: context.Background(),
    }
    cfg, err := config.LoadDefaultConfig(
        d.ctx,
        config.WithRetryMode(aws.RetryModeStandard),
        config.WithRetryMaxAttempts(10),
    )
    if err != nil {
        log.Fatal(err)
    }
    d.client = ec2.NewFromConfig(cfg)
    out, err := d.client.DescribeRegions(d.ctx, &ec2.DescribeRegionsInput{})
    if err != nil {
        log.Fatal(err)
    }
    for _, region := range out.Regions {
        d.regions = append(d.regions, *region.RegionName)
    }
}

全リージョンのインスタンス総数、実行中インスタンス数を調べる関数

API の機能を担う各メソッドはリージョンごとに並行で処理するように改善します。goroutine と channel を使ってオーソドックスに記述します。今回はページネーターも考慮しています。

type CountInfo struct {
    Region           string `json:"region"`
    TotalInstances   int    `json:"totalInstances"`
    RunningInstances int    `json:"runningInstances"`
}

func (d *Describer) GetInstancesCount() ([]CountInfo, error) {
    var wg sync.WaitGroup
    ich := make(chan CountInfo, len(d.regions))
    ech := make(chan error, 1)
    for _, region := range d.regions {
        region := region
        wg.Add(1)
        go func() {
            defer wg.Done()
            total := 0
            running := 0
            var token *string
            for {
                out, err := d.client.DescribeInstances(
                    d.ctx,
                    &ec2.DescribeInstancesInput{
                        NextToken: token,
                    },
                    func(o *ec2.Options) {
                        o.Region = region
                    },
                )
                if err != nil {
                    select {
                    case ech <- err:
                    default:
                    }
                    return
                }
                for _, r := range out.Reservations {
                    total += len(r.Instances)
                    for _, i := range r.Instances {
                        if i.State.Name == "running" {
                            running++
                        }
                    }
                }
                token = out.NextToken
                if token == nil {
                    break
                }
            }
            ich <- CountInfo{
                Region:           region,
                TotalInstances:   total,
                RunningInstances: running,
            }
        }()
    }
    go func() {
        wg.Wait()
        close(ich)
    }()
    info := []CountInfo{}
    for {
        select {
        case count, ok := <-ich:
            if !ok {
                return info, nil
            }
            info = append(info, count)
        case err := <-ech:
            return nil, err
        }
    }
}

全リージョンで Owner タグのついていないインスタンスを調べる関数

上記と同様の手法で改善します。

type InstanceInfo struct {
    Region       string                  `json:"region"`
    InstanceID   string                  `json:"instanceId"`
    InstanceName string                  `json:"instanceName"`
    State        types.InstanceStateName `json:"state"`
}

func (d *Describer) GetInstancesWithoutOwner() ([]InstanceInfo, error) {
    var wg sync.WaitGroup
    ich := make(chan InstanceInfo, runtime.NumCPU())
    ech := make(chan error, 1)
    for _, region := range d.regions {
        region := region
        wg.Add(1)
        go func() {
            defer wg.Done()
            var token *string
            for {
                out, err := d.client.DescribeInstances(
                    d.ctx,
                    &ec2.DescribeInstancesInput{
                        NextToken: token,
                    },
                    func(o *ec2.Options) {
                        o.Region = region
                    },
                )
                if err != nil {
                    select {
                    case ech <- err:
                    default:
                    }
                    return
                }
                for _, r := range out.Reservations {
                    for _, i := range r.Instances {
                        if getInstanceTagValue("Owner", i.Tags) == "" {
                            ich <- InstanceInfo{
                                Region:       region,
                                InstanceID:   aws.ToString(i.InstanceId),
                                InstanceName: getInstanceTagValue("Name", i.Tags),
                                State:        i.State.Name,
                            }
                        }
                    }
                }
                token = out.NextToken
                if token == nil {
                    break
                }
            }
        }()
    }
    go func() {
        wg.Wait()
        close(ich)
    }()
    info := []InstanceInfo{}
    for {
        select {
        case i, ok := <-ich:
            if !ok {
                return info, nil
            }
            info = append(info, i)
        case err := <-ech:
            return nil, err
        }
    }
}
...
func getInstanceTagValue(key string, tags []types.Tag) string {
    for _, t := range tags {
        if t.Key != nil && strings.EqualFold(aws.ToString(t.Key), key) && t.Value != nil {
            return *t.Value
        }
    }
    return ""
}

全リージョンでインバウンドが解放された (0.0.0.0/0 が許可された) インスタンスを調べる関数

セキュリティグループの配列 (スライス) をループで回すオーバーヘッドを軽減するため、map を使います。

type PermissionInfo struct {
    IpProtocol string `json:"ipProtocol"`
    FromPort   int32  `json:"fromPort"`
    ToPort     int32  `json:"toPort"`
    AllowFrom  string `json:"allowFrom"`
}

type InstanceSecurityGroupInfo struct {
    Region       string                  `json:"region"`
    InstanceID   string                  `json:"instanceId"`
    InstanceName string                  `json:"instanceName"`
    State        types.InstanceStateName `json:"state"`
    Permissions  []PermissionInfo        `json:"permissions"`
}

func (d *Describer) GetInstancesWithOpenPermission() ([]InstanceSecurityGroupInfo, error) {
    var wg sync.WaitGroup
    ich := make(chan InstanceSecurityGroupInfo, runtime.NumCPU())
    ech := make(chan error, 1)
    for _, region := range d.regions {
        region := region
        wg.Add(1)
        go func() {
            defer wg.Done()
            sgmap, err := d.getOpenSecurityGroups(region)
            if err != nil {
                select {
                case ech <- err:
                default:
                }
                return
            }
            if len(sgmap) == 0 {
                return
            }
            var sgids []string
            for sgid := range sgmap {
                sgids = append(sgids, sgid)
            }
            var token *string
            for {
                out, err := d.client.DescribeInstances(
                    d.ctx,
                    &ec2.DescribeInstancesInput{
                        NextToken: token,
                        Filters: []types.Filter{
                            {
                                Name:   aws.String("instance.group-id"),
                                Values: sgids,
                            },
                        },
                    },
                    func(o *ec2.Options) {
                        o.Region = region
                    },
                )
                if err != nil {
                    select {
                    case ech <- err:
                    default:
                    }
                    return
                }
                for _, r := range out.Reservations {
                    for _, i := range r.Instances {
                        var permissions []PermissionInfo
                        for _, sg := range i.SecurityGroups {
                            if perms, ok := sgmap[aws.ToString(sg.GroupId)]; ok {
                                permissions = append(permissions, perms...)
                            }
                        }
                        ich <- InstanceSecurityGroupInfo{
                            Region:       region,
                            InstanceID:   aws.ToString(i.InstanceId),
                            InstanceName: getInstanceTagValue("Name", i.Tags),
                            State:        i.State.Name,
                            Permissions:  permissions,
                        }
                    }
                }
                token = out.NextToken
                if token == nil {
                    break
                }
            }
        }()
    }
    go func() {
        wg.Wait()
        close(ich)
    }()
    info := []InstanceSecurityGroupInfo{}
    for {
        select {
        case i, ok := <-ich:
            if !ok {
                return info, nil
            }
            info = append(info, i)
        case err := <-ech:
            return nil, err
        }
    }
}

func (d *Describer) getOpenSecurityGroups(region string) (map[string][]PermissionInfo, error) {
    m := make(map[string][]PermissionInfo, defaultMapSize)
    var token *string
    for {
        openSgs, err := d.client.DescribeSecurityGroups(
            d.ctx,
            &ec2.DescribeSecurityGroupsInput{
                NextToken: token,
                Filters: []types.Filter{
                    {
                        Name:   aws.String("ip-permission.cidr"),
                        Values: []string{"0.0.0.0/0"},
                    },
                },
            },
            func(o *ec2.Options) {
                o.Region = region
            },
        )
        if err != nil {
            return nil, err
        }
        for _, sg := range openSgs.SecurityGroups {
            var permissions []PermissionInfo
            for _, p := range sg.IpPermissions {
                for _, ip := range p.IpRanges {
                    if aws.ToString(ip.CidrIp) == "0.0.0.0/0" {
                        permissions = append(permissions, PermissionInfo{
                            IpProtocol: aws.ToString(p.IpProtocol),
                            FromPort:   aws.ToInt32(p.FromPort),
                            ToPort:     aws.ToInt32(p.ToPort),
                            AllowFrom:  aws.ToString(sg.GroupName),
                        })
                    }
                }
            }
            m[aws.ToString(sg.GroupId)] = permissions
        }
        token = openSgs.NextToken
        if token == nil {
            break
        }
    }
    return m, nil
}

ハンドラー

ここからが Agents for Amazon Bedrock 固有の対応で、肝の部分です。github.com/aws/aws-lambda-go/events には Bedrock 関連イベントの型情報は現時点で存在しないため、自前で定義する必要があります。なお github.com/aws/aws-sdk-go-v2/service/bedrockagentruntime/types の型は Lambda の入出力には使えませんでした。

ResponseBody はキーが Content-Type で可変のためどのように型を定義するか迷いましたが、map をネストさせて対応しました。Body は実行する API によって型が決まるため any にしています。

type EventRequest struct {
    ActionGroup string `json:"actionGroup"`
    APIPath     string `json:"apiPath"`
    HTTPMethod  string `json:"httpMethod"`
    Parameters  []struct {
        Name  string `json:"name"`
        Type  string `json:"type"`
        Value string `json:"value"`
    } `json:"parameters"`
    ResponseBody map[string]map[string]any `json:"responseBody"`
}

type EventResponse struct {
    MessageVersion string   `json:"messageVersion"`
    Response       Response `json:"response"`
}

type Response struct {
    ActionGroup    string                    `json:"actionGroup"`
    APIPath        string                    `json:"apiPath"`
    HTTPMethod     string                    `json:"httpMethod"`
    HTTPStatusCode int                       `json:"httpStatusCode"`
    ResponseBody   map[string]map[string]any `json:"responseBody"`
}

以下はパラメーターから regions の値を取り出してスライスに変換するヘルパー関数です。値が空であれば、デフォルト値として init 関数で取得した DescribeRegions API の結果をそのまま使います。

func (d *Describer) getRegionNames(event EventRequest) {
    for _, parameter := range event.Parameters {
        if parameter.Name == "regions" && parameter.Value != "" {
            d.regions = strings.Split(parameter.Value, ",")
        }
    }
}

メインのハンドラーです。apiPath で機能を呼び分け、結果をシリアライズせずに lambda.Start に渡します。

func (d *Describer) handle(event *EventRequest) (*EventResponse, error) {
    fmt.Println("processing by golang")
    d.getRegionNames(*event)
    apiPath := event.APIPath
    var body any
    var err error
    switch apiPath {
    case "/count/{regions}":
        body, err = d.GetInstancesCount()
    case "/check-without-owner/{regions}":
        body, err = d.GetInstancesWithoutOwner()
    case "/check-open-permission/{regions}":
        body, err = d.GetInstancesWithOpenPermission()
    default:
        return nil, fmt.Errorf("api path \"%s\" not suppported", apiPath)
    }
    if err != nil {
        return nil, err
    }
    resp := &EventResponse{
        MessageVersion: "1.0",
        Response: Response{
            ActionGroup:    event.ActionGroup,
            APIPath:        event.APIPath,
            HTTPMethod:     event.HTTPMethod,
            HTTPStatusCode: 200,
            ResponseBody: map[string]map[string]any{
                "application/json": {
                    "body": body,
                },
            },
        },
    }
    result, err := json.Marshal(resp)
    if err != nil {
        return nil, err
    }
    fmt.Println(string(result))
    return resp, nil
}

func main() {
    lambda.Start(d.handle)
}

Dockerfile

CDK の lambda.DockerImageFunction に Dockerfile を渡して cdk deploy 時にビルドさせる構成は変更しません。以下のような Dockerfile にしました。

FROM golang:1.22.4-alpine3.20 as build
ARG HTTP_PROXY
ARG HTTPS_PROXY
WORKDIR /agent-go
COPY go.mod go.sum ./
COPY main.go .
RUN HTTP_PROXY=${HTTP_PROXY} HTTPS_PROXY=${HTTPS_PROXY} go build -o main main.go
FROM alpine:3.20
COPY --from=build /agent-go/main /main
ENTRYPOINT [ "/main" ]

公式ドキュメントに倣って alpine を使ってみました。いつもどおり弊社環境に合わせてプロキシの URL を外から渡しますが、Go の場合は HTTP_PROXY に加えて HTTPS_PROXY も設定することでうまくいきました。

検証

ではどの程度パフォーマンスが改善したか見ていきます。

Python

まずは従来の Python 製スクリプトの実行ログです。

106451_1

Duration Billed Duration Memory Size Max Memory Used Init Duration
Cold Start 21296.26 ms 23866 ms 128 MB 92 MB 2569.61 ms
Warm Start 21626.68 ms 21627 ms 128 MB 95 MB

雑に書いた直列のスクリプトでなんの最適化もしていないので、全リージョンを処理するのに 20 秒以上かかっています。またメモリも思いのほか消費しているようです。私が熟練の Pythonista だったらこんなことにはならなかったでしょう。

Go

次に Go 製コードの実行ログです。

106451_2

Duration Billed Duration Memory Size Max Memory Used Init Duration
Cold Start 1071.00 ms 2238 ms 128 MB 25 MB 1166.35 ms
Warm Start 452.11 ms 453 ms 128 MB 27 MB

もともとパフォーマンスの改善を目的としてリファクタリングしたため、すべての数値において大幅な改善が見られました。特に init 関数の効果でウォームスタート時の Duration がコールドスタート時の半分以下になっています。課金面でもかなりのアドバンテージがありますね。

重要

私はこの記事において Python よりも Go が優れているというつもりは全然ありません。私の場合 Python のコードを的確にチューニングする実力がなかったため、Go を使って書き直したほうがパフォーマンスが出ました。が、もし Python のまま徹底的にリファクタリングした場合によりよい結果となる可能性は否定できません。

おわりに

Agents for Amazon Bedrock のアクショングループに Go で書いた Lambda 関数を紐づけるパターンはこれまであまりなかったと思います。Bedrock 成分薄めですが、ご参考になれば幸いです。