はじめに

こんにちは、MSPの田所です。

みなさんはサブスクしてますか?
動画?音楽?食事?はたまた自動車?
世の中にはたくさんのサブスクサービスがありますね。

今日は Amazon SNS のサブスクリプションで遊んでみたのでご紹介です。
あなたはいくつサブスクしたことがありますか?

 

色々サブスクライブしてみる

AWS では多くのサブスクリプションのプロトコルをサポートしています。
色々なタイプのサブスクリプションを作成して PagerDuty(と電話)に送ってみました。

 

1. Eメール

SNS から PagerDuty の Integration Email にメール送信するシンプルな構成です。

設定

PagerDuty サービスを用意して、Integration Email を控えます。

 

まず SNS トピックを作成します。
プロトコル「E メール」で SNS サブスクリプションを作成します。

 

PagerDuty サービスに “AWS Notification – Subscription Confirmation” の通知が届くので、サブスクリプションの確認を行います。

 

ここでリンクをクリックせずにアドレスをコピーし、AWS 側でサブスクリプションの確認を行うのがコツです。
そうすることで PagerDuty のユーザー権限ではサブスクリプション解除ができなくなります。

「通知についてるサブスクリプション解除のリンクを間違えて押したら、それから通知が来なくなっちゃった!」

のような事態を避けられます。

 

ちなみに「保留中の確認」や「削除済み」のサブスクリプションはコンソール上で消せないので、残ってしまったら自然に消えるまで3日間大人しく待ちましょう。
エンドポイントを間違えたり削除した場合、PagerDuty はじめ AWS 外部からサブスクリプションを解除した場合、などが当てはまります。

 

2. AWS Lambda

SNS から PagerDuty の Events API V2 エンドポイントに POST リクエストを送ります。

設定

PagerDuty サービスの Events API V2 の Integration Key を控えます。

 

Lambda 関数を作成します。

 

例えば Python コードは以下のようになります。
SNS からの text/plain 形式のメッセージを、JSON 形式に変換して PagerDuty に送ります。
“\n” など、PagerDuty Events API V2 で受け付けられない文字があるため注意が必要です。
ここではメッセージ本文に含まれる “\n” はスペースに置き換えています。

また先程控えた Integration Key を 環境変数 “PAGERDUTY_INTEGRATION_KEY” に登録しておきます。

import json
import http.client
import logging
import os

logger = logging.getLogger()
logger.setLevel(logging.INFO)

PAGERDUTY_INTEGRATION_KEY = os.environ['PAGERDUTY_INTEGRATION_KEY']

def lambda_handler(event, context):
    for record in event['Records']:
        sns_message = record['Sns']

        # Combine Subject from SNS message with the fixed string "Lambda"
        incident_title = f"{sns_message.get('Subject', '')} (Lambda)"        

        # Transform the data to match PagerDuty Events API v2 requirements
        transformed_data = {
            "routing_key": PAGERDUTY_INTEGRATION_KEY,
            "event_action": "trigger",
            "payload": {
                "summary": incident_title,
                "source": "subscription-sns-lambda",
                "severity": "critical",
                "custom_details": {
                    "message": sns_message.get('Message', '').replace('\n', ' ')
                }
            }
        }
        
        # Send transformed data to PagerDuty
        trigger_pagerduty_alert(transformed_data)
        
    return {
        'statusCode': 200,
        'body': json.dumps('Processed successfully')
    }

def trigger_pagerduty_alert(transformed_data):
    conn = http.client.HTTPSConnection("events.pagerduty.com")
    headers = {
        'Content-Type': 'application/json'
    }
    payload = json.dumps(transformed_data)
    
    conn.request("POST", "/v2/enqueue", payload, headers)
    response = conn.getresponse()
    data = response.read()
    
    logger.info(f"PagerDuty response: {response.status} {response.reason}")
    logger.info(f"PagerDuty response data: {data.decode('utf-8')}")
    conn.close()

 

プロトコル「AWS Lambda」で SNS サブスクリプションを作成します。
エンドポイントの情報が正しければ、すぐに「確認済み」になります。
そして対象の Lambda の実行権限が SNS に自動で付与されます。

 

3. Amazon SQS

SNS から SQS にメッセージを送り、Lambda 経由で PagerDuty に POST リクエストを送ります。
Lambda のみよりメッセージ消失のリスクが低い構成です。

設定

Lambda 関数を作成します。
実行ロールに SQS へのアクセス権限が必要です。

 

Python コードの例は以下のようになります。
先程の Lambda 関数とほとんど同じですが、SQS で JSON 形式となったレコードから本文を抽出する工程が加わっています。

同じく Integration Key を 環境変数 “PAGERDUTY_INTEGRATION_KEY” に登録します。

import json
import http.client
import logging
import os

logger = logging.getLogger()
logger.setLevel(logging.INFO)

PAGERDUTY_INTEGRATION_KEY = os.environ['PAGERDUTY_INTEGRATION_KEY']

def lambda_handler(event, context):
    for record in event['Records']:
        # Extract and parse the SNS message from the SQS message body
        sns_message = json.loads(record['body'])
        
        # Combine Subject from SNS message with the fixed string "SQS"
        incident_title = f"{sns_message.get('Subject', '')} (SQS)"
        
        # Transform the data to match PagerDuty Events API v2 requirements
        transformed_data = {
            "routing_key": PAGERDUTY_INTEGRATION_KEY,
            "event_action": "trigger",
            "payload": {
                "summary": incident_title,
                "source": "subscription-sns-sqs",
                "severity": "critical",
                "custom_details": {
                    "message": sns_message.get('Message', '').replace('\n', ' ')
                }
            }
        }
        
        # Send transformed data to PagerDuty
        trigger_pagerduty_alert(transformed_data)
        
    return {
        'statusCode': 200,
        'body': json.dumps('Processed successfully')
    }

def trigger_pagerduty_alert(transformed_data):
    conn = http.client.HTTPSConnection("events.pagerduty.com")
    headers = {
        'Content-Type': 'application/json'
    }
    payload = json.dumps(transformed_data)
    
    conn.request("POST", "/v2/enqueue", payload, headers)
    response = conn.getresponse()
    data = response.read()
    
    logger.info(f"PagerDuty response: {response.status} {response.reason}")
    logger.info(f"PagerDuty response data: {data.decode('utf-8')}")
    conn.close()

 

次に SQS キューを作成し、Lambda 関数をトリガーに設定します。

 

プロトコル「Amazon SQS」で SNS サブスクリプションを作成します。
エンドポイントの情報が正しければ、すぐに「確認済み」になります。
そして SQS のアクセスポリシーに SNS からのアクセス許可が自動で付与されます。

 

4. Amazon Data Firehose

SNS から Data Firehose にメッセージをストリーミングし、Lambda 経由で PagerDuty に送信します。
ストリーミング先は分析サービスや外部の監視ツールになることが多いと思いますが、とにかくこんな感じでも連携できます、ということで。

設定

Lambda 関数を作成します。
実行ロールに Firehose へのアクセス権限を付与します。

 

Python コードの例は以下の通りです。
今度は Base64 のデコードが必要になります。

そして Integration Key は 環境変数 “PAGERDUTY_INTEGRATION_KEY” に登録します。

import json
import http.client
import logging
import os
import base64

logger = logging.getLogger()
logger.setLevel(logging.INFO)

PAGERDUTY_INTEGRATION_KEY = os.environ['PAGERDUTY_INTEGRATION_KEY']

def lambda_handler(event, context):
    output = []

    for record in event['records']:
        # Decode the base64-encoded data
        decoded_data = base64.b64decode(record['data']).decode('utf-8')
        sns_message = json.loads(decoded_data)
        
        # Combine Subject from SNS message with the fixed string "Firehose"
        incident_title = f"{sns_message.get('Subject', '')} (Firehose)"

        # Transform the data to match PagerDuty Events API v2 requirements
        transformed_data = {
            "routing_key": PAGERDUTY_INTEGRATION_KEY,
            "event_action": "trigger",
            "payload": {
                "summary": incident_title,
                "source": "subscription-sns-firehose",
                "severity": "critical",
                "custom_details": {
                    "message": sns_message.get('Message', '').replace('\n', ' ')
                }
            }
        }

        # Send transformed data to PagerDuty
        trigger_pagerduty_alert(transformed_data)

        # Prepare the output record for Firehose to indicate successful processing
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': record['data']  # Original data will be passed to S3
        }
        output.append(output_record)

    return {'records': output}

def trigger_pagerduty_alert(transformed_data):
    conn = http.client.HTTPSConnection("events.pagerduty.com")
    headers = {
        'Content-Type': 'application/json'
    }
    payload = json.dumps(transformed_data)
    
    conn.request("POST", "/v2/enqueue", payload, headers)
    response = conn.getresponse()
    data = response.read()
    
    logger.info(f"PagerDuty response: {response.status} {response.reason}")
    logger.info(f"PagerDuty response data: {data.decode('utf-8')}")
    conn.close()

 

次に Firehose ストリームを作成します。
ソースは Direct PUT で、AWS Lambda によるソースレコードの変換と、Amazon S3 バケットへの出力を設定します。
出力用の S3 バケットも作成しておきます。
Firehose ストリームは作成時にサービスロールとして Lambda や S3 に対する権限が付与されます。

 

プロトコル「Amazon Kinesis Data Firehose」で SNS サブスクリプションを作成します。
サブスクリプションロールの欄がありますが、対象の Firehose への権限を持つ IAM ロールを指定します。
エンドポイントやロールが適切であれば、すぐに「確認済み」になります。

 

5. HTTPS

SNS から API Gateway にリクエストを送り、データを成形した上で PagerDuty にリクエストします。
同じ流れで AWS Lambda を使った方が楽だった気もしますが、今回は API Gateway のみで処理することにしました。

設定

API Gateway で REST API を作成します。
/pagerduty などリソースを作成します。
メソッドタイプ POST で HTTP メソッドを作成します。
エンドポイント URL は、PagerDuty サービスの Integration URL (Alert Events) を指定します。

https://events.pagerduty.com/v2/enqueue

 

統合リクエストの編集を行います。
URL リクエストヘッダーのパラメータで Content-Type に text/plain を指定し、
マッピングテンプレートで tenxt/plain に対する処理内容を入力します。

これにより、SNS から text 形式で送られてきたデータを PagerDuty 側で有効な JSON 形式に変換します。

 

マッピングテンプレートの例は以下の通りです。
VTL (Velocity Template Language) と呼ばれるテンプレート用の言語と、JSON の組み合わせの記載となります。
内容は PagerDuty Events API V2 に合わせたフォーマットへの変換ですが、サブスクリプション確認用の通知と、通常のメッセージとで、処理を分岐させています。

編集が完了したら、新しいステージで API をデプロイします。

#set($inputRoot = $input.path('$'))
#if($inputRoot.Type == "SubscriptionConfirmation")
{
  "routing_key": "[PagerDuty Integration Key]",
  "event_action": "trigger",
  "payload": {
    "summary": "Subscription Confirmation",
    "source": "subscription-sns-https",
    "severity": "info",
    "custom_details": {
      "SubscribeURL": "$inputRoot.SubscribeURL",
      "Message": "$inputRoot.Message.replaceAll('\n', ' ')"
    }
  }
}
#else
{
  "routing_key": "[PagerDuty Integration Key]",
  "event_action": "trigger",
  "payload": {
    "summary": "$inputRoot.Subject (HTTPS)",
    "source": "subscription-sns-https",
    "severity": "critical",
    "custom_details": {
      "message": "$inputRoot.Message.replaceAll('\n', ' ')"
    }
  }
}
#end

 

プロトコル「HTTPS」で SNS サブスクリプションを作成します。
エンドポイントは以下のような URL となります。

https://[API名].execute-api.[リージョン名].amazonaws.com/[ステージ名]/[リソース名]

 

PagerDuty サービスに “Subscription Confirmation” の通知が届くので、サブスクリプションの確認を行います。
メールの時と同様、AWS 側でのサブスクリプション確認をおすすめします。

 

6. SMS

SNS から SMS(ショートメッセージサービス)にメッセージを送信します。
携帯電話の番号宛に送ります。
これは PagerDuty での検知ではありませんがご容赦ください。

設定

Amazon SNS のテキストメッセージング (SMS) で電話番号を追加し、検証コードを入力します。

 

プロトコル「SMS」で SNS サブスクリプションを作成します。
検証済みの電話番号であれば、すぐに「確認済み」になります。

 

メッセージを発行する

以下の設定が整いました。

 

充実のサブスクリプションです。

 

せーの

 

どん!!


 

おわりに

ということで SNS トピックにたくさんサブスクしてみました。
色んな種類のおもちゃを繋ぎ合わせて作品とする、夏休みの自由研究の気分です。

ここからサブスクリプションフィルターを使って配信先をコントロールしたり、連携先でさらに連携したり、無限の可能性を感じます。

 

おまけ

SNS トピックを消してもサブスクリプションは残ります。
消さなくても不都合はなさそうですが、トピックを消すなら関連のサブスクリプションも一緒に消しておきましょう。

おしまい