こんにちは、ひろかずです。
前回では、Deep Security as a Service(以下、DSaaS)で発生するイベントを全てSNSトピックでSQSへキューイングするところまで行いました。

今回は、主な需要のうち、DSaaSイベントの長期保存について一筆書きます。
3575e5de-ae74-0bf8-7163-74092dbffbd3

今回実装すること

  • Lambdaを5分毎に定期実行する。
  • SQSに蓄積したSNSトピックからDSaaSイベント情報を抜き出し、S3へ保存する
  • 処理したキューは削除する。

ざっくり構成(今回の分)

38830590-b8d3-9c7c-c50a-34fc8ae87b8c

参考情報

今回は、こちらが非常に役に立ちました。
Lambda(Python)でSQSのメッセージの内容をDynamoDBにPUTする

工程

1.格納先となるs3バケットを作成する。
2.Lambda Functionを作成する。
3.IAM権限を設定する。
4.実行結果

1. 格納先となるs3バケットを作成する。

バケット名をdsaas-snsと設定しました。
リージョンは、SQSの場所と合せました。

56b8b938-37a4-f94d-0918-dccfd96f87dd

できましたね。

b6bbd5ea-34ab-bed8-f56d-eebe0bdebdd3

2. Lambda Functionを作成する。

dsaas-sns-sqs-s3という名前のLambda Functionを作成しました。
こんな感じで設定しています。

477c772a-befe-ae62-45ab-b3917b0c836f

定期実行設定は、トリガー画面でCloudWatch Eventsで設定します。

01150d6d-09c1-35db-96d8-74d634d8682e

コードはこのように作成しました。
s3に保存されるファイル名は、DSaaSイベントのタイムスタンプを設定するようにしています。

import json
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

queueName = 'dsaas-sns'
maxNumberOfMessages = 10
AWS_S3_BUCKET_NAME = 'dsaas-sns'

def lambda_handler(event, context):

    try:
         logger.info(event)

         queue = boto3.resource('sqs').get_queue_by_name(
             QueueName = queueName
         )

         messages = queue.receive_messages(
             MaxNumberOfMessages = maxNumberOfMessages
         )

         entries = []
         items = []
         for message in messages:
              entries.append({
                   "Id": message.message_id,
                   "ReceiptHandle": message.receipt_handle
              })
              items.append({
                   "UnsubscribeURL": json.loads(message.body)['UnsubscribeURL'],
                   "SigningCertURL": json.loads(message.body)['SigningCertURL'],
                   "Signature": json.loads(message.body)['Signature'],
                   "SignatureVersion": json.loads(message.body)['SignatureVersion'],
                   "Timestamp": json.loads(message.body)['Timestamp'],
                   "Message": json.loads(message.body)['Message'],
                   "TopicArn": json.loads(message.body)['TopicArn'],
                   "MessageId": json.loads(message.body)['MessageId'],
                   "Type": json.loads(message.body)['Type']
              })
              s3 = boto3.resource('s3')
              bucket = s3.Bucket(AWS_S3_BUCKET_NAME)
              for item in items:
                   PUT_OBJECT_KEY_NAME = item['Timestamp']
                   obj = bucket.Object(PUT_OBJECT_KEY_NAME)
                   body = item['Message']

                   response = obj.put(
                       Body=body.encode('utf-8'),
                       ContentEncoding='utf-8',
                       ContentType='text/plane'
                   )

         response = {}
         if len(entries) != 0:
             response = queue.delete_messages(
                  Entries = entries
             )

         logger.info(response)
         return response

    except Exception as e:
         logger.error(e)
         raise e

3. IAM権限を追加する。

sns-sqs-s3.lambda_roleにSQSとs3の権限を追加します。

27be3ca6-3b8c-15d3-140c-527b66441c1a

4. 実行結果

SQSのメトリクスでキューが順調に処理されているのがわかりますね。

2ad393ba-511b-649e-3df0-6c4247ca9e79

s3にもイベントがオブジェクトとして保存されています。

2c8abfd1-ba21-d8a5-ffe5-8d17a63c98d5

オブジェクトの内容もDSaaSのイベントだけになっていますね。

1ab7a9d5-c0b1-0084-1c43-07df5e7d34a4

既存のイベントをSNSトピックとして飛ばすことはできませんが、この実装によってDSaaS上で新たに発生したイベントは全てs3に保存できるようになりました。

今日はここまでです。
お疲れ様でした。

元記事はこちら

SQSに蓄積したDeep SecurityのSNSトピックをLambda(python)でS3に保存する