ども、cloudpackかっぱ (@inokara) です。

はじめに

前回前々回からの続きです。

室内の温度を計測して fluentd を経由してログに出力していましたが、これを DynamoDB Local に突っ込んでみるメモです。

参考

Dynamic ダイ○マ~

以下のようなイメージです…

DynamoDB Local に fluentd 経由でデータを格納する: イメージ図

fluentd プラグイン

fluentd から DynamoDB にデータを送る場合には以下のような Output プラグインがありますが…

fluent-plugin-dynamodb は DynamoDB Local には対応していない(SDK が対応していないようです)、fluent-plugin-dynamodb-alt は DynamoDB Local には対応しているものの試す時間がありませんでした…。

exec_filter

ということで、exec_filter Output Plugin と以下のようなスクリプトを併用しました。

require 'aws-sdk-core'
require 'json'
require 'fluent-logger'
require 'msgpack'

#log = Fluent::Logger::FluentLogger.new(nil, :host=>'localhost', :port=>24224)
def put_data(time,temperature)

  dynamo_db = Aws::DynamoDB::Client.new(
      endpoint: 'http://xxx.xxx.xxx.xxx:8080',
      region: 'ap-northeast-1'
  )

  dynamo_db.put_item(
      table_name: 'temperature',
      item:  {
          temperature:  "#{temperature}",
          time:     "#{time}"
      }
  )
end

while line = STDIN.gets
  record = JSON.parse(line)
  put_data record['time'],record['temperature']
  msg = Hash["time",record['time'],"temperature",record['temperature']]
  print msg.to_msgpack
  #log.post("room.temperature.put", {"time"=> record['time'], "temperature"=> record['temperature'], "put"=> "success"})
end

情弱ですいません…。

DynamoDB のテーブル作成

aws-cli を利用して以下のようにテーブルを作成しました。

aws dynamodb create-table --endpoint-url http://xxx.xxx.xxx.xxx:8080 
  --table-name temperature 
  --attribute-definitions AttributeName=temperature,AttributeType=S AttributeName=time,AttributeType=S 
  --key-schema AttributeName=temperature,KeyType=HASH AttributeName=time,KeyType=RANGE 
  --provisioned-throughput ReadCapacityUnits=1,WriteCapacityUnits=1

とりあえず DynamoDB Local に突っ込むことが目的ということで…両方のカラム(Attribute)は AttributeType=SString で定義しています。そもそも KVS なのにカラムの定義が必要となるのは、あくまでも Hash Key または Range Key に利用する Attribute のみで、それ以外の Attribute は定義は不要です。

fluentd の設定

以下のように fluentd.conf を設定します。


  type       exec_filter
  command    /usr/bin/ruby /home/pi/fluentd/dynamodb.rb
  in_format  json
  out_format msgpack
  tag        room.temperature.put
  flush_interval 1s



  type file
  path /home/pi/fluentd/put_dynamodb.log

以下のように fluentd を起動します。

fluentd -c fluentd.conf -l fluentd.log

既に室温を収集するスクリプトを動かしている状態ですので DynamoDB Local には室温データは放り込まれているはずですが…。

しばらくすると…

以下のようなスクリプトで DynamoDB Local にデータが放り込まれていることを確認します。

#!/usr/bin/env ruby

require 'aws-sdk-core'

dynamo_db = Aws::DynamoDB::Client.new(
    endpoint: 'http://xxx.xxx.xxx.xxx:8080',
    region: 'ap-northeast-1'
)

result = dynamo_db.scan(
  table_name: 'temperature'
)

result.items.each do |item|
  item.each do |key, value|
    puts "#{key}: #{value}"
  end
end

実行すると以下のように出力されます。

time: 1420275815
temperature: 23.154296875
time: 1420275827
temperature: 23.154296875
time: 1420275840
temperature: 23.154296875
time: 1420275852
temperature: 23.154296875
time: 1420275865
temperature: 23.154296875
time: 1420275877
temperature: 23.154296875
time: 1420275890
temperature: 23.154296875
time: 1420275964
temperature: 23.154296875

おお。

ということで…

DynamoDB の本来の機能を使いきっておらず大変恐縮ですが…DynamoDB Local に fluentd から室温データを放り込んでみました。テーブル設計がいい加減過ぎるのでとりあえず放り込んでみました感満載ですいません。ちゃんと DynamoDB については勉強したいなと思っているので引き続き DynamoDB Local で課金を気にせず勉強していければなと思います。あと、fluentd の exec_filter はプラグインを作るほどでは無い処理を行わせるには便利ですな。(負荷とか気になりますが…)

元記事はこちらです。
DynamoDB Local を Docker コンテナで動かすメモ(2)