クラウドインテグレーション事業部の池田(kiwi_clp)です。
AWSのコスト分析についての記事です。
概要
Amazon EventBridge+AWS Lambdaを利用してSlackに日次で特定のAWSアカウントのコスト利用状況を投稿するコードの紹介です。
コードの仕様は以下のとおりです。
- 日別コストの出力
- 日別利用Top5サービスの出力
- 3日前の24時間利用コストを基準に過去2週間の平均コストを比較して増加傾向のサービスの可視化(コストデータが揃うのが2-3日後のため3日前を基準)
- Top5利用サービスの詳細出力
上記の情報出力、可視化を行うことで異常なコスト利用状況がないか、コストが増加傾向かまたは減少傾向かを確認できます。
特定のAWSアカウント内でどのサービスが一番コストがかかっていたか、コストがかかっているサービスのうち、どの項目がコストが高いかなどAWSアカウントにログインせずSlackを見るだけで把握ができます。
構成図
Amazon EventBridgeでAWS Lambdaを実行し、AWS Cost Explorerから請求データを取得し、分析、メッセージ加工をしてSlack Webhookを通じてSlackの指定チャンネルに書き込みを行います。
Slack出力例
以下のようなデータがSlackに出力されます。
日別コスト
日別コストTop5サービス
3日前と2週間平均のデータ比較による増減%の表示
日別コストTop5サービスの内訳
pythonコード
以下コードをコピーしてご利用可能です。
Claude Sonnet 3.7で作成しました。
import boto3 import json import datetime import os import logging import urllib.request import urllib.error from collections import defaultdict # ロガーの設定 logger = logging.getLogger() logger.setLevel(logging.INFO) # Cost Explorer クライアントの初期化 ce_client = boto3.client('ce') def lambda_handler(event, context): """ AWS Lambdaのエントリーポイント AWS Cost Explorerを使用して3日前のコストデータと過去平均を比較し、結果をSlackに通知します """ try: # 日付の計算 today = datetime.datetime.now() # 3日前の日付(対象日) target_date = (today - datetime.timedelta(days=3)).strftime('%Y-%m-%d') # 3日前の翌日の日付(Cost Explorer APIの終了日は範囲に含まれないため) target_date_next = (today - datetime.timedelta(days=2)).strftime('%Y-%m-%d') # 14日前の日付(過去の開始日) past_start_date = (today - datetime.timedelta(days=14)).strftime('%Y-%m-%d') # 4日前の日付(過去の終了日) past_end_date = (today - datetime.timedelta(days=4)).strftime('%Y-%m-%d') # 今月の最初の日 first_day_of_month = today.replace(day=1).strftime('%Y-%m-%d') # 3日前のコスト取得 logger.info(f"3日前({target_date})のコストデータを取得します") target_costs = get_cost_by_service(target_date, target_date_next) # 過去期間(14日前〜4日前)のコスト取得 logger.info(f"過去期間({past_start_date}〜{past_end_date})のコストデータを取得します") past_costs = get_cost_by_service(past_start_date, past_end_date) # 平均コストの計算 days_count = (datetime.datetime.strptime(past_end_date, '%Y-%m-%d') - datetime.datetime.strptime(past_start_date, '%Y-%m-%d')).days logger.info(f"過去{days_count}日間の平均を計算します") avg_costs = {} for service, cost in past_costs.items(): avg_costs[service] = cost / days_count if days_count > 0 else 0 # コスト比較 comparison = compare_costs(target_costs, avg_costs) # Top5のサービス top_services = get_top_services(target_costs, 5) # サービス内訳の取得(上位3サービスについて内訳を取得) service_details = {} for service, _ in top_services[:3]: logger.info(f"{service}の内訳を取得します") service_details[service] = get_service_details( service, target_date, target_date_next) # 今月の日別コスト取得 logger.info(f"今月({first_day_of_month}〜{today.strftime('%Y-%m-%d')})の日別コストを取得します") daily_costs = get_daily_costs_for_month(first_day_of_month, (today + datetime.timedelta(days=1)).strftime('%Y-%m-%d')) # 今月の合計コスト計算 month_total = sum(daily_costs.values()) logger.info(f"今月の合計コスト: {month_total:.2f}") # Slackに通知 logger.info("Slackに通知を送信します") send_to_slack(comparison, top_services, service_details, daily_costs, month_total, target_date, past_start_date, past_end_date) return { 'statusCode': 200, 'body': json.dumps('Cost analysis completed successfully') } except Exception as e: logger.error(f"エラーが発生しました: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_cost_by_service(start_date, end_date): """ 特定の日付範囲でサービスごとのコストを取得する Parameters: start_date (str): 開始日(YYYY-MM-DD形式) end_date (str): 終了日(YYYY-MM-DD形式、この日は含まれない) Returns: dict: サービス名をキー、コスト合計を値とする辞書 """ service_costs = defaultdict(float) next_token = None # Cost Explorer APIはページング処理が必要な場合があるため、 # NextTokenがなくなるまで繰り返し取得 while True: kwargs = { 'TimePeriod': { 'Start': start_date, 'End': end_date }, 'Granularity': 'DAILY', 'Metrics': ['UnblendedCost'], 'GroupBy': [ { 'Type': 'DIMENSION', 'Key': 'SERVICE' } ] } if next_token: kwargs['NextPageToken'] = next_token response = ce_client.get_cost_and_usage(**kwargs) # サービスごとに合計コストを計算 for result in response['ResultsByTime']: for group in result['Groups']: service = group['Keys'][0] cost = float(group['Metrics']['UnblendedCost']['Amount']) service_costs[service] += cost # 次のページがあるか確認 if 'NextPageToken' in response: next_token = response['NextPageToken'] else: break return dict(service_costs) def get_daily_costs_for_month(start_date, end_date): """ 特定の期間の日別コスト合計を取得する Parameters: start_date (str): 開始日(YYYY-MM-DD形式) end_date (str): 終了日(YYYY-MM-DD形式、この日は含まれない) Returns: dict: 日付をキー、その日の合計コストを値とする辞書 """ daily_costs = {} next_token = None # Cost Explorer APIで日別のコスト情報を取得 while True: kwargs = { 'TimePeriod': { 'Start': start_date, 'End': end_date }, 'Granularity': 'DAILY', 'Metrics': ['UnblendedCost'] } if next_token: kwargs['NextPageToken'] = next_token try: response = ce_client.get_cost_and_usage(**kwargs) # 日別のコスト集計 for result in response['ResultsByTime']: date = result['TimePeriod']['Start'] cost = float(result['Total']['UnblendedCost']['Amount']) daily_costs[date] = cost # 次のページがあるか確認 if 'NextPageToken' in response: next_token = response['NextPageToken'] else: break except Exception as e: logger.error(f"日別コスト取得中にエラーが発生: {str(e)}") break # 日付順にソートして返す return dict(sorted(daily_costs.items())) def get_service_details(service_name, start_date, end_date): """ 特定のサービスの詳細なコスト内訳を取得する Parameters: service_name (str): AWS サービス名 start_date (str): 開始日(YYYY-MM-DD形式) end_date (str): 終了日(YYYY-MM-DD形式、この日は含まれない) Returns: list: サービスの内訳情報のリスト(内訳項目、コスト) """ usage_details = [] next_token = None # UsageType によるグループ化でコスト詳細を取得 while True: kwargs = { 'TimePeriod': { 'Start': start_date, 'End': end_date }, 'Granularity': 'DAILY', 'Metrics': ['UnblendedCost'], 'Filter': { 'Dimensions': { 'Key': 'SERVICE', 'Values': [service_name] } }, 'GroupBy': [ { 'Type': 'DIMENSION', 'Key': 'USAGE_TYPE' } ] } if next_token: kwargs['NextPageToken'] = next_token try: response = ce_client.get_cost_and_usage(**kwargs) # 使用タイプごとのコストを集計 for result in response['ResultsByTime']: for group in result['Groups']: usage_type = group['Keys'][0] # リージョン情報とサービス名を取り除いて表示を簡略化 display_name = simplify_usage_type(usage_type, service_name) cost = float(group['Metrics']['UnblendedCost']['Amount']) # 既存エントリの検索 existing = next((item for item in usage_details if item['name'] == display_name), None) if existing: existing['cost'] += cost else: usage_details.append({ 'name': display_name, 'cost': cost }) # 次のページがあるか確認 if 'NextPageToken' in response: next_token = response['NextPageToken'] else: break except Exception as e: logger.error(f"サービス詳細取得中にエラーが発生: {str(e)}") break # コスト降順でソート usage_details = sorted(usage_details, key=lambda x: x['cost'], reverse=True) # 上位10項目のみを返す(小さすぎる値は除外) return [item for item in usage_details if item['cost'] >= 0.1][:10] def simplify_usage_type(usage_type, service_name): """ 使用タイプの表示を簡略化する Parameters: usage_type (str): 元の使用タイプ文字列(例: USW2-DataProcessing-Bytes) service_name (str): サービス名 Returns: str: 簡略化した使用タイプ名 """ # リージョン情報を削除 if '-' in usage_type: parts = usage_type.split('-', 1) if parts[0] in ['USE1', 'USW1', 'USW2', 'EUW1', 'APS1', 'APN1', 'APS2', 'EUC1', 'SAE1', 'UGW1']: usage_type = parts[1] # サービス固有の整形 if service_name == 'AmazonCloudWatch': if 'DataScanned' in usage_type: return 'データスキャン' elif 'DataProcessing' in usage_type: return 'データ処理' elif 'DataIngestion' in usage_type: return 'データ取り込み' elif 'Requests' in usage_type: if 'Put' in usage_type: return 'Put リクエスト' elif 'Get' in usage_type: return 'Get リクエスト' else: return 'API リクエスト' elif 'AlarmMonitorUsage' in usage_type: return 'アラームモニタリング' elif 'DashboardsUsageHour' in usage_type: return 'ダッシュボード使用' # EC2 固有の整形 elif 'EC2' in service_name: if 'BoxUsage' in usage_type: # インスタンスタイプの取得(例: BoxUsage:t3.micro → t3.micro) instance_type = usage_type.split(':')[1] if ':' in usage_type else usage_type return f'インスタンス実行 ({instance_type})' elif 'EBS:VolumeUsage' in usage_type: return 'EBSボリューム' elif 'DataTransfer' in usage_type: if 'In' in usage_type: return 'データ転送 (受信)' elif 'Out' in usage_type: return 'データ転送 (送信)' else: return 'データ転送' # RDS 固有の整形 elif 'RDS' in service_name: if 'InstanceUsage' in usage_type: instance_info = usage_type.split(':')[1] if ':' in usage_type else usage_type return f'DBインスタンス実行 ({instance_info})' elif 'Storage' in usage_type: return 'DBストレージ' # Lambda 固有の整形 elif 'Lambda' in service_name: if 'Request' in usage_type: return 'リクエスト' elif 'GB-Second' in usage_type: return 'コンピューティング時間' # S3 固有の整形 elif 'S3' in service_name: if 'Storage' in usage_type: return 'ストレージ' elif 'Requests' in usage_type: if 'GET' in usage_type: return 'GETリクエスト' elif 'PUT' in usage_type: return 'PUTリクエスト' else: return 'リクエスト' # ELB 固有の整形 elif 'LoadBalancing' in service_name: if 'LoadBalancerUsage' in usage_type: return 'ロードバランサー使用' elif 'LCUUsage' in usage_type: return 'LCU使用' # 適切な変換がない場合はそのまま返す return usage_type def compare_costs(current_costs, avg_costs): """ 現在のコストと平均コストを比較する Parameters: current_costs (dict): 対象日のサービスごとのコスト avg_costs (dict): 過去期間のサービスごとの平均コスト Returns: dict: サービスごとの比較結果 """ comparison = {} # すべてのサービスを取得(両方のデータセットの和集合) all_services = set(current_costs.keys()) | set(avg_costs.keys()) # コスト閾値 ($0.5以上を有意な値とする) cost_threshold = 0.5 # 差分閾値 ($0.1以上の差分のみ有意とする) diff_threshold = 0.1 # パーセンテージ変化の閾値 (1%以上の変化のみ有意とする) percentage_threshold = 1.0 for service in all_services: current_cost = current_costs.get(service, 0) avg_cost = avg_costs.get(service, 0) # 差分計算 diff = current_cost - avg_cost # 差分が閾値未満の場合、0に設定 if abs(diff) < diff_threshold: diff = 0 percentage = 0 # 両方のコストが閾値未満で非常に小さい場合 elif current_cost < cost_threshold and avg_cost < cost_threshold: percentage = 0 # 平均コストが十分にある場合、通常の計算 elif avg_cost >= cost_threshold: percentage = (diff / avg_cost) * 100 # 変化率が小さすぎる場合は0に設定 if abs(percentage) < percentage_threshold: percentage = 0 # 平均コストが小さく、現在コストが大きい場合(新規サービス) else: percentage = float('inf') comparison[service] = { 'current_cost': current_cost, 'average_cost': avg_cost, 'difference': diff, 'percentage': percentage, # 両方のコストが閾値以上、または大きな差分がある場合のみ重要と判断 'is_significant': (current_cost >= cost_threshold or avg_cost >= cost_threshold or abs(diff) >= diff_threshold) } return comparison def get_top_services(service_costs, top_n=5): """ コストが最も高いTop Nのサービスを取得する Parameters: service_costs (dict): サービスごとのコスト top_n (int): 取得するサービス数 Returns: list: (サービス名, コスト)のタプルのリスト(コスト降順) """ # コストの降順でソート sorted_services = sorted(service_costs.items(), key=lambda x: x[1], reverse=True) # Top Nを返す return sorted_services[:top_n] def format_daily_costs(daily_costs, currency_symbol): """ 日別コストをテキスト形式のグラフに変換する Parameters: daily_costs (dict): 日付をキー、コストを値とする辞書 currency_symbol (str): 通貨記号 Returns: str: フォーマットされた日別コスト表示テキスト """ if not daily_costs: return "_日別コストデータがありません_" # 最大コストを求めてスケーリング max_cost = max(daily_costs.values()) bar_length = 15 # バーの最大長 # 出力テキスト result = [] # 日付ごとにバーチャートを作成 for date, cost in daily_costs.items(): # 日付フォーマットを変更 (YYYY-MM-DD → MM/DD) display_date = f"{date[5:7]}/{date[8:10]}" # スケールしたバーの長さを計算 scaled_length = int((cost / max_cost) * bar_length) if max_cost > 0 else 0 bar = '█' * scaled_length # 結果に追加 result.append(f"{display_date}: {currency_symbol}{cost:.2f} {bar}") return '\n'.join(result) def send_to_slack(comparison, top_services, service_details, daily_costs, month_total, target_date, past_start_date, past_end_date): """ コスト分析結果をSlackに通知する Parameters: comparison (dict): サービスごとのコスト比較結果 top_services (list): Top Nサービスのリスト service_details (dict): サービスごとの詳細内訳情報 daily_costs (dict): 日別のコスト month_total (float): 今月の合計コスト target_date (str): 対象日 past_start_date (str): 過去期間の開始日 past_end_date (str): 過去期間の終了日 """ # SlackのWebhook URLを環境変数から取得 webhook_url = os.environ.get('SLACK_WEBHOOK_URL') if not webhook_url: logger.error("SLACK_WEBHOOK_URL環境変数が設定されていません") return # 通貨単位(デフォルトはUSD) currency = os.environ.get('CURRENCY', 'USD') currency_symbol = '$' # デフォルト通貨シンボル # 通貨に応じてシンボルを設定 if currency == 'JPY': currency_symbol = 'JPY ' # 円記号を使わずテキストとスペースで表現 elif currency == 'EUR': currency_symbol = 'EUR ' # ユーロ記号を使わずテキストとスペースで表現 # 簡素化したメッセージ形式に変更 message_text = f"*AWS コスト分析レポート ({target_date})*\n\n" message_text += f"{target_date}のコストと{past_start_date}〜{past_end_date}の平均コストの比較\n\n" # 今月のコスト合計を追加 message_text += f"*今月の合計コスト: {currency_symbol}{month_total:.2f}*\n\n" # 日別コストの表示 message_text += f"*今月の日別コスト*\n" message_text += format_daily_costs(daily_costs, currency_symbol) + "\n\n" # Top5サービスの情報を追加 message_text += f"*Top 5 サービス ({target_date})*\n" for service, cost in top_services: message_text += f"*{service}*: {currency_symbol}{cost:.2f}\n" message_text += "\n" # 比較結果の詳細を追加 message_text += "*サービスごとのコスト比較*\n" # コスト増加率の高い順にソート # 重要なサービスのみ対象とし、変化率が0より大きいもの(増加したもの)を優先 significant_items = [item for item in comparison.items() if item[1]['is_significant'] and item[1]['percentage'] != 0] # まず増加率の高いものを表示 increasing_items = sorted([item for item in significant_items if item[1]['percentage'] > 0], key=lambda x: x[1]['percentage'], reverse=True) # 次に減少率の高いものを表示 decreasing_items = sorted([item for item in significant_items if item[1]['percentage'] < 0], key=lambda x: x[1]['percentage']) # 増加と減少を合わせて表示(増加が先) sorted_comparison = increasing_items + decreasing_items # 上限を10項目に制限 sorted_comparison = sorted_comparison[:10] if not sorted_comparison: message_text += "_有意な変化のあるサービスはありません_\n" for service, data in sorted_comparison: current = data['current_cost'] average = data['average_cost'] diff = data['difference'] percentage = data['percentage'] # 変化の方向に応じて矢印と色を設定 if diff > 0: arrow = "🔴 ↑" elif diff < 0: arrow = "🟢 ↓" else: arrow = "⚪️ =" # 変化率が無限大または非常に小さい場合は特別な処理 if percentage == float('inf'): percentage_text = "新規" # 無限大の場合は「新規」と表示 elif percentage == 0: # 変化なし percentage_text = "変化なし" arrow = "⚪️ =" # 変化なしの矢印を強制的に設定 else: percentage_text = f"{percentage:.2f}%" message_text += f"*{service}*: " message_text += f"{target_date}: {currency_symbol}{current:.2f} | 平均: {currency_symbol}{average:.2f} | " message_text += f"差分: {currency_symbol}{diff:.2f} ({percentage_text}) {arrow}\n" # サービス内訳の詳細を追加 if service_details: message_text += "\n*サービス内訳詳細*\n" for service, details in service_details.items(): # 内訳が存在する場合のみ表示 if details: message_text += f"\n*{service}* の内訳 ({target_date}):\n" for item in details: message_text += f"- {item['name']}: {currency_symbol}{item['cost']:.2f}\n" # シンプルなペイロードを使用 message = { "text": message_text } logger.info(f"送信するSlackメッセージ形式: {json.dumps(message)[:200]}...") # Slackにメッセージを送信 try: # メッセージをJSONに変換 data = json.dumps(message).encode('utf-8') # デバッグ用にリクエストボディの一部をログに出力 logger.info(f"リクエストボディ先頭100文字: {data[:100]}") # リクエストを作成 req = urllib.request.Request( webhook_url, data=data, headers={'Content-Type': 'application/json'} ) # リクエストを送信 with urllib.request.urlopen(req) as response: response_body = response.read().decode('utf-8') if response.status == 200: logger.info("Slack通知を送信しました") else: logger.error(f"Slack通知に失敗しました: {response.status}, {response_body}") except urllib.error.HTTPError as e: error_body = e.read().decode('utf-8') if hasattr(e, 'read') else 'エラーボディなし' logger.error(f"Slack通知の送信中にHTTPエラーが発生しました: {e.code}, {e.reason}, ボディ: {error_body}") except urllib.error.URLError as e: logger.error(f"Slack通知の送信中にURLエラーが発生しました: {e.reason}") except Exception as e: logger.error(f"Slack通知の送信中にエラーが発生しました: {str(e)}")
個別設定
Slack
- 出力したい指定のチャンネルのWebHook URLを作成
Amazon EventBridge
- AWS Lambdaを実行するためにcronでトリガー設定を実施
AWS Lambda
- pythonで作成、バージョンは3.13で動作確認済み
- タイムアウトを15秒に設定
- 環境変数でキーに【SLACK_WEBHOOK_URL】、値にWebHook URLを記載
- 環境変数でキーに【CURRENCY】、値に【USD】を記載
- AWS Cost Explorerからデータが取れるようにIAMロールに【Allow: ce:GetCostAndUsage】【Allow: logs:CreateLogGroup】【Allow: logs:PutLogEvents】の許可
- AWS Lambdaの標準で作成されるロールにマネージドポリシーの【AWSCostAndUsageReportAutomationPolicy】アタッチでもOKです。
IAMポリシー1
<br />{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ce:GetCostAndUsage" ], "Resource": "*" } ] }
IAMポリシー2
<br />{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:ap-northeast-1:{AWSアカウント番号}:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:ap-northeast-1:{AWSアカウント番号}:log-group:/aws/lambda/{Lamba名}:*" ] } ] }
本コードは弊社の検証環境AWSアカウントで導入をしています。
メンバーへコスト利用状況の周知、異常なコスト増が発生していないかを確認、どのサービスがコストが高いか確認などを把握してもらう目的で作成しました。
汎用的に使えるコードになっているのでぜひ活用してください!