こんな感じです。

Untitled%2814%29

API(Boto3)でPutするのは、まあ当たり前ですが、Getに関しては、
Lambdaの"event source"にKinesisというものがあり、それを設定することにより
イベントドリブンでKinesisのデータをLambdaで処理することができます。

ということで、作ってみます。

Kinesisの作成

AWSマネジメントコンソールより下記のように作成します。

2015-11-06_07-08-07

2015-11-06_07-14-19

2015-11-06_07-15-59

IAMの設定

Lambdaに設定するIAMロール(lambda_basic_exection)にKinesisが扱える
マネージドポリシー(AmazonKinesisFullAccess)をアタッチします。

2015-11-06_07-35-10

2015-11-06_07-36-35

2015-11-06_07-37-17

KinesisへPutするLambdaファンクション

コードは、こんな感じです。

import logging
import boto3

def lambda_handler(event, context):

    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    try:
        logger.info(event)
        response = boto3.client('kinesis').put_record(
            StreamName = "test",
            Data = "test",
            PartitionKey = "test",
        )
        logger.info(response)
        return response

    except Exception as e:
        logger.error(e)
        raise e

実行すると、レスポンスは次のように返ってきます。

{
  "ShardId": "shardId-000000000000",
  "ResponseMetadata": {
    "HTTPStatusCode": 200,
    "RequestId": "eef4ccde-840d-11e5-9d26-6b863e58e437"
  },
  "SequenceNumber": "49556079134761214959115455796946952254907110183804076034"
}

Kinesisのモニタリングで確認しても、
ちゃんとPutレコードがカウントされていることがわかります。

2015-11-06_07-59-50

KinesisからGetするLambdaファンクション

コードは、こんな感じです。

import logging
import base64

def lambda_handler(event, context):

    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    try:

        logger.info(event)

        payloads = []
        for record in event['Records']:
            payload = base64.b64decode(record["kinesis"]["data"])
            payloads.append(payload)
        logger.info(payloads)

        return payloads

    except Exception as e:
        logger.error(e)
        raise e

そして、Event Sourceとして、Kinesisを追加します。

2015-11-06_07-54-49

2015-11-06_07-56-16

2015-11-06_09-16-28

CloudWatch Logsを確認すると、ちゃんとKinesisのEvent Sourceを設定した、
Lambdaファンクションが実行されログが出力されていることがわかります。

2015-11-06_08-15-09

CloudWatchのMetricsでもGetリクエストで確認することができます。

2015-11-06_09-12-41

元記事はこちら

Lambda(Python)からKinesisにPut(API)してLambda(Python)でGet(Event)する