こんな感じです。
API(Boto3)でPutするのは、まあ当たり前ですが、Getに関しては、
Lambdaの"event source"にKinesisというものがあり、それを設定することにより
イベントドリブンでKinesisのデータをLambdaで処理することができます。
ということで、作ってみます。
Kinesisの作成
AWSマネジメントコンソールより下記のように作成します。
IAMの設定
Lambdaに設定するIAMロール(lambda_basic_exection)にKinesisが扱える
マネージドポリシー(AmazonKinesisFullAccess)をアタッチします。
KinesisへPutするLambdaファンクション
コードは、こんな感じです。
import logging import boto3 def lambda_handler(event, context): logger = logging.getLogger() logger.setLevel(logging.INFO) try: logger.info(event) response = boto3.client('kinesis').put_record( StreamName = "test", Data = "test", PartitionKey = "test", ) logger.info(response) return response except Exception as e: logger.error(e) raise e
実行すると、レスポンスは次のように返ってきます。
{ "ShardId": "shardId-000000000000", "ResponseMetadata": { "HTTPStatusCode": 200, "RequestId": "eef4ccde-840d-11e5-9d26-6b863e58e437" }, "SequenceNumber": "49556079134761214959115455796946952254907110183804076034" }
Kinesisのモニタリングで確認しても、
ちゃんとPutレコードがカウントされていることがわかります。
KinesisからGetするLambdaファンクション
コードは、こんな感じです。
import logging import base64 def lambda_handler(event, context): logger = logging.getLogger() logger.setLevel(logging.INFO) try: logger.info(event) payloads = [] for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) payloads.append(payload) logger.info(payloads) return payloads except Exception as e: logger.error(e) raise e
そして、Event Sourceとして、Kinesisを追加します。
CloudWatch Logsを確認すると、ちゃんとKinesisのEvent Sourceを設定した、
Lambdaファンクションが実行されログが出力されていることがわかります。
CloudWatchのMetricsでもGetリクエストで確認することができます。
元記事はこちら
「Lambda(Python)からKinesisにPut(API)してLambda(Python)でGet(Event)する」