検証環境のリソースに自動でタグ付けするシステム構築

検証環境でリソースを作成する際、誰が作成したのかを明確にするためにタグ付けを行う環境は多いと思います。しかし、実際には作成者タグのないリソースが散見されることはありませんか?

そこで今回は、検証環境に新規リソースを作成する際に、自動的に作成者と作成時刻をタグとして付与するスクリプトを作成しました。このスクリプトを使えば、誰がいつリソースを作成したのかが一目瞭然となり、リソース管理の効率化に繋がります。

さらに、EC2とRDSインスタンスには停止時刻もタグ付けすることで、不要なリソースを容易に発見できるようにしました。

この自動タグ付けシステムは、CloudFormationテンプレートを使用して簡単にデプロイできます。

作成時刻と作成者が自動タグ付けされるリソース

  • EC2
  • EIP
  • VPC
  • NatGateway
  • VPCEndpoint
  • AMI
  • Snapshot
  • EBS
  • ALB
  • NLB
  • RDSインスタンス
  • RDSスナップショット
  • S3バケット

停止時刻が記録されるリソース

  • EC2
  • RDSインスタンス

使い方

※ 実行環境はCloudShellを想定しています。
※ 検証環境での実行を想定しており、実行ユーザーの権限は「AdministratorAccess」を設定してください。

  1. 以下のコマンドを実行し、ファイルをダウンロードします。
    wget https://iret.media/wp-content/uploads/2025/02/Automatic-Tagging-System.zip
  2. ファイルを解凍します。
    unzip Automatic-Tagging-System.zip
  3. ディレクトリに移動します。
    cd Automatic-Tagging-System
  4. ディレクトリ内で、以下のコマンドを実行します。
    • 全リージョンにデプロイする場合:
      python Deploy_All_Region.py
    • 特定のリージョンにデプロイする場合:
      python Deploy_Individual_Region.py
    • デプロイしたスタックを削除する場合:
      python Delete_All_Region.py

      ※ 特定のリージョンのみ削除したい場合は、リージョンのCloudFormationスタックを削除してください。

構成要素

このスクリプトは、以下の要素で構成されています。

  • デプロイ用Pythonスクリプト: CloudFormationスタックの作成・更新、Lambda関数のデプロイを自動化します。
  • CloudFormationテンプレート: Lambda関数、IAMロール、CloudWatch EventsルールなどのAWSリソースを定義します。
  • Lambda関数: 対象リソースへのタグ設定処理を実行します。

●デプロイ用Pythonスクリプトの説明

デプロイ用Pythonスクリプトのフローチャートは以下となります。S3からLambda関数をデプロイする場合、LambdaとS3は同一リージョンに存在する必要があるため、デプロイするリージョンごとにS3バケットを作成しています。

作成されたS3バケットのバケット名とキー情報はCfnのパラメータとしてデプロイ時に参照されます。

フローチャート

●CFnテンプレート及びLambda関数の説明

CFnテンプレートでは、EventBridge Ruleでパブリック AWS サービス API へのリクエストを読み取り、タグ設定用のLambda関数を呼び出しています。

AWSTemplateFormatVersion: '2010-09-09'
Description: 'Automatic-Tagging-System-Deploy'
Parameters:
  TempS3BucketName:
    Type: String
  TempS3KeyName:
    Type: String

Resources:
  LambdaFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      ManagedPolicyArns:
        - 'arn:aws:iam::aws:policy/ResourceGroupsandTagEditorFullAccess'
        - 'arn:aws:iam::aws:policy/ResourceGroupsTaggingAPITagUntagSupportedResources'
        - 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'        

  LambdaFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.lambda_handler
      Code:
        S3Bucket: !Sub '${TempS3BucketName}'
        S3Key: !Sub '${TempS3KeyName}'
      Role: !GetAtt LambdaFunctionRole.Arn
      Runtime: python3.11
      MemorySize: 128
      Timeout: 30

  LogGroupFirehosLambda:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/lambda/${LambdaFunction}'
      RetentionInDays: 7

  LambdaFunctionPermission:
    Type: "AWS::Lambda::Permission"
    Properties:
      Action: "lambda:InvokeFunction"
      FunctionName: !GetAtt 
        - LambdaFunction
        - Arn
      Principal: "events.amazonaws.com"
      SourceArn: !GetAtt 
        - EventRule
        - Arn

  EventRule: 
    Type: AWS::Events::Rule
    Properties: 
      EventPattern: 
        source: 
          - "aws.ec2"
          - "aws.rds"
          - "aws.elasticloadbalancing"
          - "aws.s3"
        detail-type: 
          - "AWS API Call via CloudTrail"
        detail: 
          eventSource: 
            - "ec2.amazonaws.com"
            - "rds.amazonaws.com"
            - "elasticloadbalancing.amazonaws.com"
            - "s3.amazonaws.com"
      State: "ENABLED"
      Targets:
        - Arn: !GetAtt 
            - LambdaFunction
            - Arn
          Id: TargetFunctionV1

Lambda関数では、与えられたペイロードの中身をCase文で分岐して処理しています。タグ設定対象を追加する場合は、CFnテンプレートの「AWS::Events::Rule」部分とLambdaのCase文を修正してデプロイします。

import boto3, json
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo

client_ResourceGroupsTaggingApi = boto3.client('resourcegroupstaggingapi')
client_EC2                      = boto3.client('ec2')

def ResourceGroupsTaggingApi(ARN, AutoSet_Owner_Name, CreationDate):
    client_ResourceGroupsTaggingApi.tag_resources(
        ResourceARNList=[ARN],
        Tags={'AutoSet_Owner_Name': AutoSet_Owner_Name, 'CreationDate': CreationDate}
    )
    return

def ec2ResourceTagging(Resources, AutoSet_Owner_Name, CreationDate):
    client_EC2.create_tags(
        Resources=[Resources],
        Tags=[
            {'Key': 'AutoSet_Owner_Name','Value': AutoSet_Owner_Name},
            {'Key': 'CreationDate','Value': CreationDate},
        ]
    )
    return

def ResourceGroupsTaggingApi_StopDate(ARN, AutoSet_Owner_Name, StopDate):
    client_ResourceGroupsTaggingApi.tag_resources(
        ResourceARNList=[ARN],
        Tags={'LastStopDate': StopDate}
    )
    return

def ec2ResourceTagging_StopDate(Resources, AutoSet_Owner_Name, StopDate):
    client_EC2.create_tags(
        Resources=[Resources],
        Tags=[
            {'Key': 'LastStopDate','Value': StopDate},
        ]
    )
    return


def lambda_handler(event, content):
    #print(event)
    if not 'detail' in event:
        return

    # インスタンス作成者の情報
    AutoSet_Owner_Name = event['detail']['userIdentity']['arn'].split("/")[-1]
    # インスタンス作成日の情報
    # 文字列からdatetimeオブジェクトを作成
    utc_dt = datetime.fromisoformat(event['detail']['eventTime'])
    # タイムゾーンを'JST'に変更
    jst_dt = utc_dt.astimezone(ZoneInfo("Asia/Tokyo"))
    # 出力形式を指定
    eventTime  = jst_dt.strftime('%Y-%m-%dT%H:%M:%S%z')

    # イベント名取得
    match event['detail']['eventName']:
        #EC2関連リソース
        case 'RunInstances':
            resource_id = event['detail']['responseElements']['instancesSet']['items'][0]['instanceId']
            ec2ResourceTagging(resource_id, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        case 'AllocateAddress':
            resource_id = event['detail']['responseElements']['allocationId']
            ec2ResourceTagging(resource_id, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        case 'CreateVpc':
            resource_id = event['detail']['responseElements']['vpc']['vpcId']
            ec2ResourceTagging(resource_id, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        case 'CreateNatGateway':
            resource_id = event['detail']['responseElements']['CreateNatGatewayResponse']['natGateway']['natGatewayId']
            ec2ResourceTagging(resource_id, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        case 'CreateVpcEndpoint':
            resource_id = event['detail']['responseElements']['CreateVpcEndpointResponse']['vpcEndpoint']['vpcEndpointId']
            ec2ResourceTagging(resource_id, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        case 'CreateImage':
            resource_id = event['detail']['responseElements']['imageId']
            ec2ResourceTagging(resource_id, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        case 'CreateSnapshots':
            resource_id = event['detail']['responseElements']['CreateSnapshotsResponse']['snapshotSet']['snapshotId']
            ec2ResourceTagging(resource_id, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        case 'CreateVolume':
            resource_id = event['detail']['responseElements']['volumeId']
            ec2ResourceTagging(resource_id, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        #EC2関連以外のリソース
        #設定可能なリソース→https://docs.aws.amazon.com/ja_jp/ARG/latest/userguide/supported-resources.html
        #リソースのARN形式→https://docs.aws.amazon.com/ja_jp/IAM/latest/UserGuide/reference-arns.html
        case 'CreateLoadBalancer':
            ARN = event['detail']['responseElements']['loadBalancers'][0]['loadBalancerArn']
            ResourceGroupsTaggingApi(ARN, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        case 'CreateDBInstance':
            ARN = event['detail']['responseElements']['dBInstanceArn']
            ResourceGroupsTaggingApi(ARN, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        case 'CreateDBSnapshot':
            ARN = event['detail']['responseElements']['dBSnapshotArn']
            ResourceGroupsTaggingApi(ARN, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        case 'CreateDBCluster':
            ARN = event['detail']['responseElements']['dBClusterArn']
            ResourceGroupsTaggingApi(ARN, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        case 'CreateDBClusterSnapshot':
            ARN = event['detail']['responseElements']['dBClusterSnapshotArn']
            ResourceGroupsTaggingApi(ARN, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        case 'CreateBucket':
            bucketName = event['detail']['requestParameters']['bucketName']
            ARN = f"arn:aws:s3:::{bucketName}"
            ResourceGroupsTaggingApi(ARN, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        #EC2とRDSは最終停止日時も設定する
        case 'StopInstances':
            resource_id = event['detail']['responseElements']['instancesSet']['items'][0]['instanceId']
            ec2ResourceTagging_StopDate(resource_id, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        case 'StopDBInstance':
            ARN = event['detail']['responseElements']['dBInstanceArn']
            ResourceGroupsTaggingApi_StopDate(ARN, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
        case 'StopDBCluster':
            ARN = event['detail']['responseElements']['dBClusterArn']
            ResourceGroupsTaggingApi_StopDate(ARN, AutoSet_Owner_Name, eventTime)
            return { 'statusCode': 200,'body': json.dumps('Excuted!')}
    return { 'statusCode': 200,'body': json.dumps('Excuted!')}