超メモシリーズ。
何がしたいのか?
CloudWatch Logs Subscriptions でフィルタされたレコードを Lambda で Twitter に呟かせたい。
メモ
Lambda にはどのような状態でレコードが入ってくるのか?
CloudWatch Logs Subscriptions でフィルタされたレコードは Kinesis ストリームを介して Lambda の event
オブジェクトには以下のような JSON で入ってくる。
{ "awslogs": { "data": "H4sIAAAAAAAAAJ2RTY+CMBCG/wrpXgXb0lLgZrJqTPbjoDdjTJWJNlIgpega43/fAbMfpz3sqZN3Ju887/RGLLStPsDq2gDJyfNkNdm+TpfLyXxKRqS+VOBQ5pIxSdM4SShFuawPc1d3DXaKen8CF6Ly0JfegbbYaGunrbYQNqaB0lSw5ZTJkNGQKRxtu127d6bxpq5mpvTgWpKvyYu2u0I/TLb+AuDJZvCdnqHy/ciNmALtY865SngsOE0TJWjMleI8TZhgMhNKZkpKpSjPUFY8EzLFkUT09N5gZq8t4jMhJMNUTCSKj75ugfaLUbD+5k1XNMuFzGMaSUFxY/DEkk2weJu9B2EY5MHR+yYfjxvLZeTROzJVfdJOR/vajk/Xrr1245/00dHbEjngwzu991DMDJQFZruRQnvo8fqT/7m9v2DDcSwfKoZvGPY/cB7g+wqGEhnJ72D/Qb3fN/dPf1Js6SkCAAA=" } }
Python で処理しようとする場合には以下のように JSON がパースされた状態で event
オブジェクトに入ってくる。
data
キーから値を取り出してレコードを解析(Base64 をデコードして解凍)する。解析すると以下のような状態の JSON になる。
{ "logEvents": [ { "extractedFields": { "message": "http://pm25.test.inokara.com/kyusyu/2015-10-17.html", "lev": "INFO", "lv": "I,", "sp1": "--", "sp2": ":", "datetime": "2015-10-18T09:45:30.540420 #16" }, "message": "I, [2015-10-18T09:45:30.540420 #16] INFO -- : http://pm25.test.inokara.com/kyusyu/2015-10-17.html", "timestamp": 1445136614672, "id": "32227623420867403277228614159475975577029772729458032640" } ], "subscriptionFilters": [ "LambdaStream_tweet" ], "logStream": "soramame-pipeline_2015-10-17", "logGroup": "docker-log", "owner": "251150836600", "messageType": "DATA_MESSAGE" }
最終的には logEvents
の extractedFields.message
にある URL を取り出して Twitter に呟かせたい。
ということで
以下のような Lambda ファンクションとなった。
# -*- coding: utf-8 -*- import sys, json import datetime import ConfigParser import twitter import base64 import gzip from StringIO import StringIO def extract_url(record): # Base64 デコードして解凍してデータを取得する decoded_data = record.decode("base64") json_data = json.loads(gzip.GzipFile(fileobj=StringIO(decoded_data)).read()) for data in json_data['logEvents']: return data['extractedFields']['message'] def lambda_handler(event, context): # config.ini から Twitter API の Credential 情報を取得 c = ConfigParser.SafeConfigParser() c.read("./config.ini") # Twitter API を初期化 api = twitter.Api( consumer_key = c.get('tw','consumer_key'), consumer_secret = c.get('tw','consumer_secret'), access_token_key = c.get('tw','access_token_key'), access_token_secret = c.get('tw','access_token_secret'), ) # event オブジェクトからレコードを取得する url = extract_url(event['awslogs']['data']) d = datetime.date.today() - datetime.timedelta(1) d = u"%s 年 %s 月 %s 日 PM2.5 の状況(九州地方のみ): " % (d.year, d.month, d.day) print d + url print api.PostUpdate(d + url)
早速…呟かせてみる
以下のように呟かせてみた。
# # テストログを用意 # % cat test.log [ { "timestamp": 1445136614672, "message": "I, [2015-10-18T09:45:30.540420 #16] INFO -- : http://pm25.test.inokara.com/kyusyu/2015-10-17.html" } ] # # ログを put する # % aws --region ap-northeast-1 logs put-log-events --log-group-name docker-log --log-stream-name ${stream_name} --log-events file://test.log --sequence-token ${token}
おお、きた。
後で…
も少し手順をまとめよう。
以上。