こんにちは、ひろかずです。
前回では、Deep Security as a Service(以下、DSaaS)で発生するイベントを全てSNSトピックでSQSへキューイングするところまで行いました。
今回は、主な需要のうち、DSaaSイベントの長期保存
について一筆書きます。
今回実装すること
- Lambdaを5分毎に定期実行する。
- SQSに蓄積したSNSトピックからDSaaSイベント情報を抜き出し、S3へ保存する
- 処理したキューは削除する。
ざっくり構成(今回の分)
参考情報
今回は、こちらが非常に役に立ちました。
Lambda(Python)でSQSのメッセージの内容をDynamoDBにPUTする
工程
1.格納先となるs3バケットを作成する。
2.Lambda Functionを作成する。
3.IAM権限を設定する。
4.実行結果
1. 格納先となるs3バケットを作成する。
バケット名をdsaas-sns
と設定しました。
リージョンは、SQSの場所と合せました。
できましたね。
2. Lambda Functionを作成する。
dsaas-sns-sqs-s3
という名前のLambda Functionを作成しました。
こんな感じで設定しています。
定期実行設定は、トリガー画面でCloudWatch Eventsで設定します。
コードはこのように作成しました。
s3に保存されるファイル名は、DSaaSイベントのタイムスタンプを設定するようにしています。
import json import boto3 import logging logger = logging.getLogger() logger.setLevel(logging.INFO) queueName = 'dsaas-sns' maxNumberOfMessages = 10 AWS_S3_BUCKET_NAME = 'dsaas-sns' def lambda_handler(event, context): try: logger.info(event) queue = boto3.resource('sqs').get_queue_by_name( QueueName = queueName ) messages = queue.receive_messages( MaxNumberOfMessages = maxNumberOfMessages ) entries = [] items = [] for message in messages: entries.append({ "Id": message.message_id, "ReceiptHandle": message.receipt_handle }) items.append({ "UnsubscribeURL": json.loads(message.body)['UnsubscribeURL'], "SigningCertURL": json.loads(message.body)['SigningCertURL'], "Signature": json.loads(message.body)['Signature'], "SignatureVersion": json.loads(message.body)['SignatureVersion'], "Timestamp": json.loads(message.body)['Timestamp'], "Message": json.loads(message.body)['Message'], "TopicArn": json.loads(message.body)['TopicArn'], "MessageId": json.loads(message.body)['MessageId'], "Type": json.loads(message.body)['Type'] }) s3 = boto3.resource('s3') bucket = s3.Bucket(AWS_S3_BUCKET_NAME) for item in items: PUT_OBJECT_KEY_NAME = item['Timestamp'] obj = bucket.Object(PUT_OBJECT_KEY_NAME) body = item['Message'] response = obj.put( Body=body.encode('utf-8'), ContentEncoding='utf-8', ContentType='text/plane' ) response = {} if len(entries) != 0: response = queue.delete_messages( Entries = entries ) logger.info(response) return response except Exception as e: logger.error(e) raise e
3. IAM権限を追加する。
sns-sqs-s3.lambda_roleにSQSとs3の権限を追加します。
4. 実行結果
SQSのメトリクスでキューが順調に処理されているのがわかりますね。
s3にもイベントがオブジェクトとして保存されています。
オブジェクトの内容もDSaaSのイベントだけになっていますね。
既存のイベントをSNSトピックとして飛ばすことはできませんが、
この実装によってDSaaS上で新たに発生したイベントは全てs3に保存できるようになりました。
今日はここまでです。
お疲れ様でした。