はじめに

前回は Agents for Amazon Bedrock を使い、自然言語による指示で EC2 インスタンスの情報収集をしてみました。構成としては、機能ごとに Lambda 関数を作成してそれぞれ 1 対 1 でアクショングループに紐付け、自然言語による指示で関数を呼び分けました。図で表すと以下のような形です。

104504_1

今回はこれを次のように修正しました。機能をすべて単一の Lambda 関数に統合し、API スキーマの paths を指示文で指定することで機能を呼び分けます。変更後のコードはブランチを切って公開しています。

104504_2

Lambda

コード

やることは単純で、run_with_regions の内部で event["apiPath"] を使って分岐させます。

import json

import boto3

client = boto3.client("ec2")
regions = client.describe_regions()["Regions"]


def handler(event, context):
    body = run_with_regions(event["apiPath"])
    response = {
        "messageVersion": "1.0",
        "response": {
            "actionGroup": event["actionGroup"],
            "apiPath": event["apiPath"],
            "httpMethod": event["httpMethod"],
            "httpStatusCode": 200,
            "responseBody": {"application/json": {"body": body}},
        },
    }
    return response


def run_with_regions(apiPath):
    global results
    results = []
    for region in regions:
        if apiPath == "/count": # <- この if で分岐させる
            get_instances_count(region["RegionName"])
        elif apiPath == "/check-without-owner":
            get_instances_without_owner(region["RegionName"])
        elif apiPath == "/check-open-permission":
            get_instances_with_open_permission(region["RegionName"])
        else:
            print('Error: apiPath "{}" not supported'.format(apiPath))
            break
    return json.dumps(obj=results, ensure_ascii=False)


# 機能 1
def get_instances_count(region_name: str):
    try:
        client = boto3.client("ec2", region_name=region_name)
        instances = client.describe_instances()
        instance_count = 0
        instance_running = 0
        for reservation in instances["Reservations"]:
            instance_count += len(reservation["Instances"])
            for instance in reservation["Instances"]:
                if instance["State"]["Name"] == "running":
                    instance_running += 1
        result = {
            "region": region_name,
            "instance_count": instance_count,
            "instance_running": instance_running,
        }
        results.append(result)
    except Exception as e:
        print("Error: {}: {}".format(region_name, e))


# 機能 2
def get_instances_without_owner(region_name: str):
    try:
        client = boto3.client("ec2", region_name=region_name)
        instances = client.describe_instances()
        for reservation in instances["Reservations"]:
            for instance in reservation["Instances"]:
                if get_instance_tag_value("Owner", instance) == "":
                    result = {
                        "region": region_name,
                        "instance_id": instance["InstanceId"],
                        "instance_name": get_instance_tag_value("Name", instance),
                    }
                    results.append(result)
    except Exception as e:
        print("Error: {}: {}".format(region_name, e))


# 機能 3
def get_instances_with_open_permission(region_name: str):
    try:
        client = boto3.client("ec2", region_name=region_name)
        security_groups = client.describe_security_groups(
            Filters=[{"Name": "ip-permission.cidr", "Values": ["0.0.0.0/0"]}]
        )
        if not security_groups["SecurityGroups"]:
            return None
        instances = client.describe_instances(
            Filters=[
                {
                    "Name": "instance.group-id",
                    "Values": [
                        security_group["GroupId"]
                        for security_group in security_groups["SecurityGroups"]
                    ],
                }
            ]
        )
        for reservation in instances["Reservations"]:
            for instance in reservation["Instances"]:
                permissions = []
                for security_group in instance["SecurityGroups"]:
                    security_group_details = client.describe_security_groups(
                        GroupIds=[security_group["GroupId"]]
                    )
                    for security_group_detail in security_group_details[
                        "SecurityGroups"
                    ]:
                        for permission in security_group_detail["IpPermissions"]:
                            for ip_range in permission.get("IpRanges"):
                                if ip_range.get("CidrIp") == "0.0.0.0/0":
                                    permission_detail = {
                                        "protocol": permission.get("IpProtocol"),
                                        "from_port": permission.get("FromPort"),
                                        "to_port": permission.get("ToPort"),
                                        "allow_from": security_group["GroupName"],
                                    }
                                    permissions.append(permission_detail)
                if permissions:
                    result = {
                        "region": region_name,
                        "instance_id": instance["InstanceId"],
                        "instance_name": get_instance_tag_value("Name", instance),
                        "permissions": permissions,
                    }
                    results.append(result)
    except Exception as e:
        print("Error: {}: {}".format(region_name, e))


def get_instance_tag_value(key, instance) -> str:
    return next(
        (
            tag["Value"]
            for tag in instance.get("Tags")
            if tag["Key"].lower() == key.lower()
        ),
        "",
    )

API スキーマ

API スキーマ定義では機能の分だけ paths を切ります。

openapi: 3.0.0
info:
  title: Lambda
  version: 1.0.0
paths:
  /count:
    get:
      summary: Check EC2 instances count
      description: リージョンごとにインスタンスの総数、実行中のインスタンスの数を集計してJSONで返します。
      operationId: get_instances_count
      responses:
        "200":
          description: インスタンス数の確認が成功しました。
          content:
            application/json:
              schema:
                type: array
                items:
                  type: object
                  properties:
                    region:
                      type: string
                      description: リージョン名
                    instance_count:
                      type: integer
                      description: インスタンスの総数
                    instance_running:
                      type: integer
                      description: 実行中のインスタンスの数
  /check-without-owner:
    get:
      summary: Check EC2 instances without owner tag
      description: リージョンごとにOwnerタグが付与されているか確認し、リージョン名、インスタンスID、インスタンス名を収集してJSONで返します。
      operationId: get_instances_without_owner
      responses:
        "200":
          description: Ownerタグが付与されていないインスタンスの存在確認が成功しました。
          content:
            application/json:
              schema:
                type: array
                items:
                  type: object
                  properties:
                    region:
                      type: string
                      description: リージョン名
                    instance_id:
                      type: string
                      description: インスタンスID
                    instance_name:
                      type: string
                      description: インスタンス名
  /check-open-permission:
    get:
      summary: Check EC2 instances with open permission
      description: リージョンごとにインバウンド通信で0.0.0.0/0が許可されているインスタンスがあるかを確認し、リージョン名、インスタンスID、インスタンス名、許可されているプロトコル、開始ポート、終了ポート、どのセキュリティグループからの許可かを示すセキュリティグループ名を収集してJSONで返します。
      operationId: get_instances_with_open_permission
      responses:
        "200":
          description: インバウンド通信が解放されたインスタンスの存在確認が成功しました。
          content:
            application/json:
              schema:
                type: array
                items:
                  type: object
                  properties:
                    region:
                      type: string
                      description: リージョン名
                    instance_id:
                      type: string
                      description: インスタンスID
                    instance_name:
                      type: string
                      description: インスタンス名
                    permissions:
                      type: array
                      items:
                        type: object
                        properties:
                          protocol:
                            type: string
                            description: プロトコル
                          from_port:
                            type: string
                            description: 開始ポート
                          to_port:
                            type: string
                            description: 終了ポート
                          allow_from:
                            type: string
                            description: どのセキュリティグループからの許可かを示すセキュリティグループ名

CDK コード

Lambda 関数が統合され、lib/image/agent/src 配下がすっきりしました。前回と比べて関数がひとつになった以上の変更はないので、instruction だけピックアップして紹介します。

.
├── LICENSE
├── README.md
├── bin
│   └── main.ts
├── cdk-bedrock.code-workspace
├── cdk.context.json
├── cdk.example.json
├── cdk.json
├── docs
│   └── diagram.png
├── jest.config.js
├── lib
│   ├── constructs
│   │   ├── bedrock.ts
│   │   ├── ecs.ts
│   │   └── function.ts
│   ├── image
│   │   ├── agent
│   │   │   ├── README.md
│   │   │   ├── pyproject.toml
│   │   │   ├── requirements-dev.lock
│   │   │   ├── requirements.lock
│   │   │   └── src
│   │   │       └── agent
│   │   │           ├── Dockerfile
│   │   │           ├── agent.py
│   │   │           ├── requirements.txt
│   │   │           └── schema.yaml
│   │   └── app
│   │       ├── README.md
│   │       ├── pyproject.toml
│   │       ├── requirements-dev.lock
│   │       ├── requirements.lock
│   │       └── src
│   │           └── app
│   │               ├── Dockerfile
│   │               ├── app.py
│   │               └── requirements.txt
│   └── stack.ts
├── package-lock.json
├── package.json
├── test
│   └── cdk-bedrock.test.ts
└── tsconfig.json

Instruction

自然言語での指示内容を、関数名による指示から paths を使った指示に変更しました。

あなたはAWSに精通したソリューションアーキテクトです。いくつかの関数を使い分け、ユーザーの要求に日本語で回答してください。
ただし、コードやデータ構造については日本語ではなくそのまま回答してください。

タスク1:
もし、例えば「インスタンスの数を教えてください」というように、インスタンスの数について聞かれたら、”/count”というapiPathに紐づく機能を呼び出してください。

タスク2:
もし、例えば「Ownerタグの付与されていないインスタンスの情報を教えてください」というように、Ownerタグが付与されていないインスタンスの有無について聞かれたら、”/check-without-owner”というapiPathに紐づく機能を呼び出してください。

タスク3:
もし、例えば「インバウンド通信で0.0.0.0/0が許可されているインスタンスの情報を教えてください」というように、インバウンド通信が解放されたインスタンスの有無について聞かれたら、”/check-open-permission”というapiPathを呼び出してください。

タスク4:
タスク1、タスク2、タスク3以外の場合は、Lambda関数を実行せずに一般的な質問への回答をしてください。わからない質問には「その質問には回答できません」と回答してください

出力形式:
出力形式の指示が特にない場合は、項目を網羅して整形し、リスト形式で結果を返してください。
例えば「マークダウンの表で回答してください。」や「結果のJSONをそのまま返してください。」など、ユーザーから出力形式が指定された場合に限って、指示に合わせた回答をしてください。

特記事項:
関数の実行結果に関する質問をユーザーから追加で受けた場合は、再度同様の関数を実行することなく、前回の結果を参照し、文脈にあった回答をしてください。

検証

では試してみます。今回は Streamlit ではなく、マネコンでトレース内容も含めて確認します。

まずは「インスタンス数の取得」を指示してみましょう。出力が整形されないので見づらいですが、適切に質問を理解し、機能の呼び分けができています。

104504_3

では「Owner タグのないインスタンスの取得」を指示してみましょう。こちらも問題なく適切な apiPath を参照できています。

104504_4

最後に「0.0.0.0/0 が許可されたインスタンスの取得」を指示してみましょう。こちらも問題ありません。apiPath で指示して分岐させる方法は有効なようですね!

104504_5

おわりに

単一の関数から apiPath で複数の機能を呼び分けられることを確認しました。今回はすべての機能がパラメーターを必要としないものだったので単純でしたが、機能ごとにパラメーターの数が異なる場合はもう少し複雑化すると思います。