クラウドインテグレーション事業部 セキュリティセクションの日下です。
今回は、EventBridge とLambdaを使って、セキュリティグループの差分を抽出する方法を
簡単ではありますが記載していきます。
実施背景
社内環境にて、AWSを利用したシステムが複数存在しており、その中にはPCI DSS 4.0準拠が求められるシステムも含まれています。
PCI DSS 4.0では、要件1.2.7 にてネットワークセキュリティコントロール(NSC)の設定を少なくとも6ヶ月に1回は見直しすることが求められており、AWSにおいてはセキュリティグループがこれに該当します。
しかし、現実の運用ではセキュリティグループの設定が頻繁に追加・変更されるため、定期的な棚卸し時に「何がいつ・どのように変更されたか」を人手で追うのは現実的ではありませんでした。
そこで、Amazon EventBridge と AWS Lambda を組み合わせることで、セキュリティグループの差分を自動的に検出・記録する仕組みを構築し、レビュー作業の効率化と確実な証跡の確保を目指しました。
必要なAWSサービスと実装方法について
実装にあたり、以下のAWSサービスが必要になります。
- S3
- Lambda関数
- EventBridge
S3バケットの作成
下記の権限を許可したバケットポリシーを設定したバケットを作成します。
- PutObject:差分CSVや最新CSVをアップロードできる
- GetObject:前回取得分を読み込める
- ListBucket::バケット全体 最新ファイルを選ぶための検索ができる
{
“Version”: “2012-10-17”,
“Statement”: [
{
“Sid”: “AllowLambdaAccess”,
“Effect”: “Allow”,
“Principal”: {
“AWS”: [
“#Lambda関数に付与しているロールのarn”
]
},
“Action”: [
“s3:PutObject”,
“s3:GetObject”,
“s3:ListBucket”
],
“Resource”: [
“arn:aws:s3:::#格納先のバケット名”
]
}
]
}
Lambda関数の作成
セキュリティグループ設定を取得し、前回取得分と比較して差分を検出、CSV形式でS3に保存する関数を作成します。
主な処理の流れは以下です。
- S3から前回のセキュリティグループCSVを取得
- 現在のセキュリティグループ情報を取得
- 現在の情報をCSV化
- 差分があれば、差分CSVも生成
- 今回のCSVと(あれば)差分CSVをS3にアップロード
- 結果をログ+返却(JSON)
そして実際の関数は下記です。
import boto3
import csv
import datetime
import io
import jsondef upload_to_s3(bucket_name, file_name, data):
“””
データをS3にアップロードする関数
“””
s3 = boto3.client(‘s3′)
s3.put_object(
Bucket=bucket_name,
Key=file_name,
Body=data,
ContentType=’text/csv’
)
print(f”{file_name}を{bucket_name}にアップロードしました。”)def get_latest_sg_file(bucket_name, prefix):
“””
S3バケット内で最終更新日時が新しいファイルを取得
“””
s3 = boto3.client(‘s3’)
try:
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
if ‘Contents’ not in response:
print(“対象のファイルが存在しません。”)
return Nonelatest_file = max(response[‘Contents’], key=lambda x: x[‘LastModified’])
latest_file_key = latest_file[‘Key’]# ファイルの内容を取得
file_response = s3.get_object(Bucket=bucket_name, Key=latest_file_key)
data = file_response[‘Body’].read().decode(‘utf-8’)
print(f”最新のファイルを取得: {latest_file_key}”)
return data
except Exception as e:
print(f”エラーが発生しました: {e}”)
return Nonedef convert_to_csv(security_groups):
“””
セキュリティグループのデータをCSV形式に変換
“””
output = io.StringIO()
writer = csv.writer(output)# ヘッダーの定義
header = [
“GroupName”, “GroupId”, “VpcId”, “Description”,
“Protocol”, “FromPort”, “ToPort”, “CidrIp”
]
writer.writerow(header)# データを行ごとに追加
for sg in security_groups:
group_name = sg.get(“GroupName”, “”)
group_id = sg.get(“GroupId”, “”)
vpc_id = sg.get(“VpcId”, “”)
description = sg.get(“Description”, “”)# IpPermissionsからルールを抽出
for permission in sg.get(“IpPermissions”, []):
protocol = permission.get(“IpProtocol”, “”)
from_port = permission.get(“FromPort”, “”)
to_port = permission.get(“ToPort”, “”)
for ip_range in permission.get(“IpRanges”, []):
cidr_ip = ip_range.get(“CidrIp”, “”)
writer.writerow([
group_name, group_id, vpc_id, description,
protocol, from_port, to_port, cidr_ip
])return output.getvalue()
def convert_diff_to_csv(added, removed, updated):
“””
差分データをCSV形式に変換
“””
output = io.StringIO()
writer = csv.writer(output)# ヘッダーの定義
header = [“ChangeType”, “GroupName”, “GroupId”, “VpcId”, “Description”, “Protocol”, “FromPort”, “ToPort”, “CidrIp”]
writer.writerow(header)# 追加データ
for row in added:
writer.writerow([“Added”] + row)# 削除データ
for row in removed:
writer.writerow([“Removed”] + row)# 更新データ
for change in updated:
writer.writerow([“Updated (Previous)”] + change[‘previous’])
writer.writerow([“Updated (Current)”] + change[‘current’])return output.getvalue()
def calculate_diff_by_fields(previous_data, current_data):
“””
差分を特定フィールドごとに計算
“””
previous_rows = csv.reader(io.StringIO(previous_data)) if previous_data else []
current_rows = csv.reader(io.StringIO(current_data))previous_list = list(previous_rows)[1:] if previous_data else []
current_list = list(current_rows)[1:]added = []
removed = []
updated = []current_dict = {
(row[0], row[5], row[6]): row for row in current_list
}previous_dict = {
(row[0], row[5], row[6]): row for row in previous_list
}for key, value in current_dict.items():
if key not in previous_dict:
added.append(value)
elif value != previous_dict[key]:
updated.append({“previous”: previous_dict[key], “current”: value})for key, value in previous_dict.items():
if key not in current_dict:
removed.append(value)return added, removed, updated
def lambda_handler(event, context):
region = “us-east-2”
bucket_name = “#アップ先ロードのバケット名”
file_path = “SGtest/”try:
jst = datetime.timezone(datetime.timedelta(hours=9))
current_time = datetime.datetime.now(jst).strftime(‘%Y-%m-%d %H:%M:%S’)previous_data = get_latest_sg_file(bucket_name, file_path)
ec2 = boto3.client(‘ec2’, region_name=region)
security_groups = ec2.describe_security_groups()[‘SecurityGroups’]
print(“取得したセキュリティグループ情報:”, security_groups)current_csv_data = convert_to_csv(security_groups)
print(f”生成されたCSVデータ:\n{current_csv_data[:500]}”)if previous_data:
added, removed, updated = calculate_diff_by_fields(previous_data, current_csv_data)
diff_csv_data = convert_diff_to_csv(added, removed, updated)
diff_file_name = f”{file_path}SG_diff_{datetime.datetime.now(jst).strftime(‘%Y%m%d_%H%M%S’)}.csv”
upload_to_s3(bucket_name, diff_file_name, diff_csv_data)update_time = datetime.datetime.now(jst).strftime(‘%Y-%m-%d_%H-%M-%S’)
current_file_name = f”{file_path}SG_current_{update_time}.csv”
upload_to_s3(bucket_name, current_file_name, current_csv_data)return {
‘statusCode’: 200,
‘body’: json.dumps({
“message”: “セキュリティグループデータをCSV形式でS3にアップロードしました”,
“current_file_name”: current_file_name,
“diff_file_name”: diff_file_name if previous_data else None
})
}
except Exception as e:
print(f”Error: {e}”)
return {
‘statusCode’: 500,
‘body’: str(e)
}
ざっくりLambdaを起動をしてから、パラメータごとの処理は下記の流れになります。
Lambda起動
└ get_latest_sg_file ← S3から前回CSV取得
└ describe_security_groups ← EC2からSG取得
└ convert_to_csv ← 今回のCSV生成
└ calculate_diff_by_fields ← 差分検出
└ convert_diff_to_csv ← 差分CSV生成
└ upload_to_s3 ← 差分CSVアップロード
└ upload_to_s3 ← 今回のCSVアップロード
そして、下記の権限を許可したポリシーを付与したロールを割り当てます。
- ec2:DescribeSecurityGroups :セキュリティグループの情報取得(describe_security_groups() で使用)
- s3:PutObject S3へファイルをアップロード(CSV保存)
- s3:GetObject S3からファイルをダウンロード(過去CSV取得)
- s3:ListBucket S3バケット内のファイル一覧を取得(最新ファイル検索に使用)
- logs:CreateLogGroup CloudWatchロググループの新規作成(初回起動時など)
- logs:CreateLogStream 関数ごとのログストリーム作成
- logs:PutLogEvents ログイベント(printやエラー)の出力
{
“Version”: “2012-10-17”,
“Statement”: [
{
“Effect”: “Allow”,
“Action”: [
“ec2:DescribeSecurityGroups”,
“s3:PutObject”,
“s3:GetObject”,
“s3:ListBucket”
],
“Resource”: “*”
},
{
“Effect”: “Allow”,
“Action”: [
“logs:CreateLogGroup”,
“logs:CreateLogStream”,
“logs:PutLogEvents”
],
“Resource”: [
“arn:aws:logs:us-east-2:アカウントID:log-group:/aws/lambda/SGtest:*”,]
}
]
}
EventBridgeスケジュール の作成
EventBridgeは、定期的に処理を実行したいときに使えるサービスです。
今回は、「半年に1回 セキュリティグループの設定を見直す」というPCI DSS 4.0の要件に対応するために、Cron式スケジュールを使って、6ヶ月ごとにLambda関数を実行するように設定しました。
Amazon EventBridge → スケジュールからスケジュールを作成より、以下の流れで作成していきます。
- スケジュール名:SG_Information
- 説明(オプション):SG_Information
- スケジュールグループ:default
- 頻度:定期的なスケジュール
- タイムゾーン:Asia/Tokyo (UTC+09:00)
- スケジュールの種類:cron式スケジュール
- cron 式:cron(0 0 1 6 ? *) ※半年に1回実行するため
- ターゲットの詳細:AWS Lambda Invoke
- Lambda関数:セキュリティグループの情報を取得するLambda関数(SGtest) を選択
- 実行ロール:EventBridgeを実行するロールをここで作成 or 既存のロールの選択。
※ ちなみに今回は、下記の許可ポリシーを適用したロールを付与しています。
- 許可ポリシー(実行するLambda関数のみを指定したポリシー)
{
“Version”: “2012-10-17”,
“Statement”: [
{
“Effect”: “Allow”,
“Action”: [
“lambda:InvokeFunction”
],
“Resource”: [
“arn:aws:lambda:us-east-2:#アカウントID:function:#セキュリティグループの情報を取得するLambda関数(SGtest):*”
]
}
]
}
- 信頼ポリシー(実施するAWSアカウントのAmazon EventBridge Scheduler がこのロールを使える設定)
{
“Version”: “2012-10-17”,
“Statement”: [
{
“Effect”: “Allow”,
“Principal”: {
“Service”: “scheduler.amazonaws.com”
},
“Action”: “sts:AssumeRole”,
“Condition”: {
“StringEquals”: {
“aws:SourceAccount”: “アカウントID”
}
}
}
]
}
必要なAWSサービスと実装方法について
実際にEvent Bridge スケジュール→ Lambda関数 の実行により、出力された状態は以下になります。
- SG_current_#実行日時.csv:関数実行時点でのセキュリティグループを取得したファイル
- SG_diff_#実行日時.csv:差分のみが記載されたファイル
実際のファイルの中身はこちらです。
SG_current_#実行日時.csv:関数実行時点でのセキュリティグループを取得したファイル
項目ごとの記載内容は以下です。
- GroupName セキュリティグループの名前
- GroupId セキュリティグループの一意なID
- VpcId このSGが属しているVPCのID
- Description SG作成時に入力した説明
- Protocol 通信プロトコル(tcp, udp, icmp など)
- FromPort 許可された範囲の開始ポート番号
- ToPort 許可された範囲の終了ポート番号
- CidrIp 通信を許可する送信元IP範囲
SG_diff_#実行日時.csv:差分のみが記載されたファイル
項目ごとの記載内容は以下です。
- ChangeType 差分の種別(Added, Removed, Updated)
- GroupName 対象のセキュリティグループ名
- GroupId 追加や削除されたセキュリティグループのID
- VpcId 対象のVPC
- Description 対象セキュリティグループの説明
- Protocol / FromPort / ToPort / CidrIp 削除または追加された通信ルールの内容
これら2つファイルより、現行のセキュリティグループの設定と前回取得分からの変更点を自動且つ楽に管理することができます!
まとめ
本記事でご紹介した仕組みにより、セキュリティグループの設定変更を確実に記録し、定期的なレビューや監査対応を効率的に行うことが可能になります。
今後も運用の手間を減らしつつ、セキュリティ水準の維持・向上につながる仕組みづくりを進めていきたいと思います!