はじめに
前回は Agents for Amazon Bedrock を使い、自然言語による指示で EC2 インスタンスの情報収集をしてみました。構成としては、機能ごとに Lambda 関数を作成してそれぞれ 1 対 1 でアクショングループに紐付け、自然言語による指示で関数を呼び分けました。図で表すと以下のような形です。
今回はこれを次のように修正しました。機能をすべて単一の Lambda 関数に統合し、API スキーマの paths
を指示文で指定することで機能を呼び分けます。変更後のコードはブランチを切って公開しています。
Lambda
コード
やることは単純で、run_with_regions
の内部で event["apiPath"]
を使って分岐させます。
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | import json import boto3 client = boto3.client( "ec2" ) regions = client.describe_regions()[ "Regions" ] def handler(event, context): body = run_with_regions(event[ "apiPath" ]) response = { "messageVersion" : "1.0" , "response" : { "actionGroup" : event[ "actionGroup" ], "apiPath" : event[ "apiPath" ], "httpMethod" : event[ "httpMethod" ], "httpStatusCode" : 200 , "responseBody" : { "application/json" : { "body" : body}}, }, } return response def run_with_regions(apiPath): global results results = [] for region in regions: if apiPath = = "/count" : # <- この if で分岐させる get_instances_count(region[ "RegionName" ]) elif apiPath = = "/check-without-owner" : get_instances_without_owner(region[ "RegionName" ]) elif apiPath = = "/check-open-permission" : get_instances_with_open_permission(region[ "RegionName" ]) else : print ( 'Error: apiPath "{}" not supported' . format (apiPath)) break return json.dumps(obj = results, ensure_ascii = False ) # 機能 1 def get_instances_count(region_name: str ): try : client = boto3.client( "ec2" , region_name = region_name) instances = client.describe_instances() instance_count = 0 instance_running = 0 for reservation in instances[ "Reservations" ]: instance_count + = len (reservation[ "Instances" ]) for instance in reservation[ "Instances" ]: if instance[ "State" ][ "Name" ] = = "running" : instance_running + = 1 result = { "region" : region_name, "instance_count" : instance_count, "instance_running" : instance_running, } results.append(result) except Exception as e: print ( "Error: {}: {}" . format (region_name, e)) # 機能 2 def get_instances_without_owner(region_name: str ): try : client = boto3.client( "ec2" , region_name = region_name) instances = client.describe_instances() for reservation in instances[ "Reservations" ]: for instance in reservation[ "Instances" ]: if get_instance_tag_value( "Owner" , instance) = = "": result = { "region" : region_name, "instance_id" : instance[ "InstanceId" ], "instance_name" : get_instance_tag_value( "Name" , instance), } results.append(result) except Exception as e: print ( "Error: {}: {}" . format (region_name, e)) # 機能 3 def get_instances_with_open_permission(region_name: str ): try : client = boto3.client( "ec2" , region_name = region_name) security_groups = client.describe_security_groups( Filters = [{ "Name" : "ip-permission.cidr" , "Values" : [ "0.0.0.0/0" ]}] ) if not security_groups[ "SecurityGroups" ]: return None instances = client.describe_instances( Filters = [ { "Name" : "instance.group-id" , "Values" : [ security_group[ "GroupId" ] for security_group in security_groups[ "SecurityGroups" ] ], } ] ) for reservation in instances[ "Reservations" ]: for instance in reservation[ "Instances" ]: permissions = [] for security_group in instance[ "SecurityGroups" ]: security_group_details = client.describe_security_groups( GroupIds = [security_group[ "GroupId" ]] ) for security_group_detail in security_group_details[ "SecurityGroups" ]: for permission in security_group_detail[ "IpPermissions" ]: for ip_range in permission.get( "IpRanges" ): if ip_range.get( "CidrIp" ) = = "0.0.0.0/0" : permission_detail = { "protocol" : permission.get( "IpProtocol" ), "from_port" : permission.get( "FromPort" ), "to_port" : permission.get( "ToPort" ), "allow_from" : security_group[ "GroupName" ], } permissions.append(permission_detail) if permissions: result = { "region" : region_name, "instance_id" : instance[ "InstanceId" ], "instance_name" : get_instance_tag_value( "Name" , instance), "permissions" : permissions, } results.append(result) except Exception as e: print ( "Error: {}: {}" . format (region_name, e)) def get_instance_tag_value(key, instance) - > str : return next ( ( tag[ "Value" ] for tag in instance.get( "Tags" ) if tag[ "Key" ].lower() = = key.lower() ), "", ) |
API スキーマ
API スキーマ定義では機能の分だけ paths
を切ります。
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | openapi: 3.0.0 info: title: Lambda version: 1.0.0 paths: /count: get: summary: Check EC2 instances count description: リージョンごとにインスタンスの総数、実行中のインスタンスの数を集計してJSONで返します。 operationId: get_instances_count responses: "200": description: インスタンス数の確認が成功しました。 content: application/json: schema: type: array items: type: object properties: region: type: string description: リージョン名 instance_count: type: integer description: インスタンスの総数 instance_running: type: integer description: 実行中のインスタンスの数 /check-without-owner: get: summary: Check EC2 instances without owner tag description: リージョンごとにOwnerタグが付与されているか確認し、リージョン名、インスタンスID、インスタンス名を収集してJSONで返します。 operationId: get_instances_without_owner responses: "200": description: Ownerタグが付与されていないインスタンスの存在確認が成功しました。 content: application/json: schema: type: array items: type: object properties: region: type: string description: リージョン名 instance_id: type: string description: インスタンスID instance_name: type: string description: インスタンス名 /check-open-permission: get: summary: Check EC2 instances with open permission description: リージョンごとにインバウンド通信で0.0.0.0/0が許可されているインスタンスがあるかを確認し、リージョン名、インスタンスID、インスタンス名、許可されているプロトコル、開始ポート、終了ポート、どのセキュリティグループからの許可かを示すセキュリティグループ名を収集してJSONで返します。 operationId: get_instances_with_open_permission responses: "200": description: インバウンド通信が解放されたインスタンスの存在確認が成功しました。 content: application/json: schema: type: array items: type: object properties: region: type: string description: リージョン名 instance_id: type: string description: インスタンスID instance_name: type: string description: インスタンス名 permissions: type: array items: type: object properties: protocol: type: string description: プロトコル from_port: type: string description: 開始ポート to_port: type: string description: 終了ポート allow_from: type: string description: どのセキュリティグループからの許可かを示すセキュリティグループ名 |
CDK コード
Lambda 関数が統合され、lib/image/agent/src
配下がすっきりしました。前回と比べて関数がひとつになった以上の変更はないので、instruction
だけピックアップして紹介します。
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | . ├── LICENSE ├── README.md ├── bin │ └── main.ts ├── cdk-bedrock.code-workspace ├── cdk.context.json ├── cdk.example.json ├── cdk.json ├── docs │ └── diagram.png ├── jest.config.js ├── lib │ ├── constructs │ │ ├── bedrock.ts │ │ ├── ecs.ts │ │ └── function.ts │ ├── image │ │ ├── agent │ │ │ ├── README.md │ │ │ ├── pyproject.toml │ │ │ ├── requirements-dev.lock │ │ │ ├── requirements.lock │ │ │ └── src │ │ │ └── agent │ │ │ ├── Dockerfile │ │ │ ├── agent.py │ │ │ ├── requirements.txt │ │ │ └── schema.yaml │ │ └── app │ │ ├── README.md │ │ ├── pyproject.toml │ │ ├── requirements-dev.lock │ │ ├── requirements.lock │ │ └── src │ │ └── app │ │ ├── Dockerfile │ │ ├── app.py │ │ └── requirements.txt │ └── stack.ts ├── package-lock.json ├── package.json ├── test │ └── cdk-bedrock.test.ts └── tsconfig.json |
Instruction
自然言語での指示内容を、関数名による指示から paths
を使った指示に変更しました。
あなたはAWSに精通したソリューションアーキテクトです。いくつかの関数を使い分け、ユーザーの要求に日本語で回答してください。
ただし、コードやデータ構造については日本語ではなくそのまま回答してください。タスク1:
もし、例えば「インスタンスの数を教えてください」というように、インスタンスの数について聞かれたら、”/count”というapiPathに紐づく機能を呼び出してください。タスク2:
もし、例えば「Ownerタグの付与されていないインスタンスの情報を教えてください」というように、Ownerタグが付与されていないインスタンスの有無について聞かれたら、”/check-without-owner”というapiPathに紐づく機能を呼び出してください。タスク3:
もし、例えば「インバウンド通信で0.0.0.0/0が許可されているインスタンスの情報を教えてください」というように、インバウンド通信が解放されたインスタンスの有無について聞かれたら、”/check-open-permission”というapiPathを呼び出してください。タスク4:
タスク1、タスク2、タスク3以外の場合は、Lambda関数を実行せずに一般的な質問への回答をしてください。わからない質問には「その質問には回答できません」と回答してください出力形式:
出力形式の指示が特にない場合は、項目を網羅して整形し、リスト形式で結果を返してください。
例えば「マークダウンの表で回答してください。」や「結果のJSONをそのまま返してください。」など、ユーザーから出力形式が指定された場合に限って、指示に合わせた回答をしてください。特記事項:
関数の実行結果に関する質問をユーザーから追加で受けた場合は、再度同様の関数を実行することなく、前回の結果を参照し、文脈にあった回答をしてください。
検証
では試してみます。今回は Streamlit ではなく、マネコンでトレース内容も含めて確認します。
まずは「インスタンス数の取得」を指示してみましょう。出力が整形されないので見づらいですが、適切に質問を理解し、機能の呼び分けができています。
では「Owner タグのないインスタンスの取得」を指示してみましょう。こちらも問題なく適切な apiPath
を参照できています。
最後に「0.0.0.0/0 が許可されたインスタンスの取得」を指示してみましょう。こちらも問題ありません。apiPath
で指示して分岐させる方法は有効なようですね!
おわりに
単一の関数から apiPath
で複数の機能を呼び分けられることを確認しました。今回はすべての機能がパラメーターを必要としないものだったので単純でしたが、機能ごとにパラメーターの数が異なる場合はもう少し複雑化すると思います。