ども、cloudpackかっぱ (@inokara) です。

はじめに

Raspberry Pifluentd をインストールした勢いで fluent-plugin-kinesis をインストールして触ってみました。


準備

fluent-plugin-kinesis のインストール

ドキュメントに従ってインストール。

sudo gem install bundler --no-ri --no-rdoc -V
git clone https://github.com/awslabs/aws-fluent-plugin-kinesis.git
cd aws-fluent-plugin-kinesis/
sudo bundle install
sudo rake build
sudo fluent-gem install pkg/fluent-plugin-kinesis --no-ri --no-rdoc -V

Amazon Kinesis の準備

Stream の作成

SDK を叩きます。

[1] pry(main)> require 'aws-sdk'
=> true
[2] pry(main)> AWS.config.credentials
=> {:access_key_id=>"AKXXXXXXXXXXXXXXXXXXXX", :secret_access_key=>"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"}
=> 
[3] pry(main)> st = AWS::Kinesis.new(region: "ap-northeast-1").client
=> #
[4] pry(main)> st.create_stream(:stream_name => "raspi", :shard_count => 1)
=> {}
[5] pry(main)>

上記で raspi_test という Stream が Shard 1 で出来上がりました。

[6] pry(main)> st.list_streams
=> {:stream_names=>["raspi"], :has_more_streams=>false}

ちゃんと出来ていますな。

fluentd の設定

fluentd の設定は以下のように書きました。

<source>
  type forward
</source>
<match raspi.**>
  type copy
  <store>
    type stdout
  </store>
  <store>
    type kinesis

    stream_name raspi

    aws_key_id AKXXXXXXXXXXXXXXXXXXXXXXX
    aws_sec_key XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    region ap-northeast-1

    partition_key name
  </store>
</match>

テスト

fluentd の起動

fluentd を以下のように起動します。

fluentd -c test.conf -v -o test.log &

test.log は以下のように記録されました。

2014-11-01 22:08:13 +0000 [info]: fluent/supervisor.rb:223:supervise: starting fluentd-0.10.56
2014-11-01 22:08:13 +0000 [info]: fluent/supervisor.rb:325:read_config: reading config file path="test.conf"
2014-11-01 22:08:15 +0000 [info]: fluent/engine.rb:90:block in configure: gem 'fluent-plugin-kinesis' version '0.2.0'
2014-11-01 22:08:15 +0000 [info]: fluent/engine.rb:90:block in configure: gem 'fluentd' version '0.10.56'
2014-11-01 22:08:15 +0000 [info]: fluent/engine.rb:94:configure: using configuration file: <ROOT>
  <source>
    type forward
  </source>
  <match raspi.**>
    type copy
    <store>
      type stdout
    </store>
    <store>
      type kinesis
      stream_name raspi
      aws_key_id AKXXXXXXXXXXXXXXXXXXXXXXX
      aws_sec_key XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
      region ap-northeast-1
      partition_key name
    </store>
  </match>
</ROOT>
2014-11-01 22:08:15 +0000 [info]: fluent/engine.rb:107:block in configure: adding source type="forward"
2014-11-01 22:08:15 +0000 [info]: fluent/engine.rb:124:block in configure: adding match pattern="raspi.**" type="copy"
2014-11-01 22:08:15 +0000 [debug]: plugin/out_copy.rb:40:block in configure: adding store type="stdout"
2014-11-01 22:08:15 +0000 [debug]: plugin/out_copy.rb:40:block in configure: adding store type="kinesis"
2014-11-01 22:08:18 +0000 [info]: plugin/in_forward.rb:75:listen: listening fluent socket on 0.0.0.0:24224

起動しました。何度も申し上げて恐縮ですが Raspberry Pi 上で起動しています。

Amazon Kinesis にレコードを投げてみる

投げると言っても fluent-cat するだけです。

echo '{"name":"kappa","action":"hage"}' | fluent-cat raspi.test

ログにも以下のように記録されています。

2014-11-01 22:12:38 +0000 raspi.test: {"name":"kappa","action":"hage"}

重要なのは Kinesis に送られているかですが…

{:sequence_number=>"49544702721808617341148525113521629367322285252229988354", :data=>"eyJuYW1lIjoia2FwcGEiLCJhY3Rpb24iOiJoYWdlIiwidGltZSI6IjIwMTQtnMTEtMDFUMjI6MTI6MzhaIiwidGFnIjoicmFzcGkudGVzdCJ9n", :partition_key=>"kappa"}

一応、上記のように Stream から Data Record を取得すること出来ましたので Raspberry Pi から fluent-cat したレコードは登録されていると判断出来るかと思います。

おおっって感じです。


最後に

とりあえず触ってみたレベルで大変恐縮ですが、fluent-plugin-kinesis が問題なく Raspberry Pi にインストールして動いたのはなぜか感動でした。Kinesis 自体をちゃんと理解出来ていないのが辛いですが、引続き Raspberry Pi と Kinesis 等の AWS サービスとの連携等も試していければと思います。

元記事は、こちらです。
Raspberry Pi で fluent-plugin-kinesis を使ったメモ