- tl;dr
- 今回やりたかったこと
- サンプル実装
— logstash.conf
— ELK Stack の用意
— デモ - event オブジェクトへのアクセス方法
— Event API
— GitHub issue
— 5.x 以前
— 5.x 移行
— 影響範囲 - その他の filter
— mutate
— grok - 以上
tl;dr
logstash を使っていて、filter でちょっと凝ったことをしたいと思って調べていたら Ruby のコードが直接書けるようなので導入してみた。
www.elastic.co
今回やりたかったこと
- input で入ってきた event オブジェクトの中にある date 型フィールドデータのフォーマットを整形したデータを追加したい
- 具体的には ISO8601 で入ってきた Datetime データを
HH:MM:SS
とHH:MM
に整形してそれぞれ個別のフィールドに追加したい
サンプル実装
logstash.conf
logstash 5.x とそれ以前のバージョンで書き方が若干異なるので注意。
- logstash 5.x の場合
input { tcp { port => 5000 } } filter { ruby { code => " event.set('datetime_hour_min_sec', DateTime.parse(event.get('message')).strftime('%H:%M:%S')); event.set('datetime_hour_min', DateTime.parse(event.get('message')).strftime('%H:%M')); " } } output { stdout { codec => "rubydebug" } }
- logstash 5.x 以前
input { tcp { port => 5000 } } filter { ruby { code => " event['datetime_hour_min_sec'] = DateTime.parse(event['message']).strftime('%H:%M:%S'); event['datetime_hour_min'] = DateTime.parse(event['message']).strftime('%H:%M'); " } } output { stdout { codec => "rubydebug" } }
詳細については、以下の注意点にて。
ELK Stack の用意
ちょっと脱線して Elasticsearch + Logstash + Kibana 環境については以下の docker-compose 用の YAML を利用した。
github.com
超簡単に ELK Stack を作成することが出来た。
$ git clone https://github.com/deviantony/docker-elk.git $ cd docker-elk $ docker-compose up -d $ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 1ff0e05da5ab dockerelk_logstash "/usr/local/bin/do..." 15 minutes ago Up 12 minutes 5044/tcp, 0.0.0.0:5000->5000/tcp, 9600/tcp dockerelk_logstash_1 3099200a271e dockerelk_kibana "/bin/sh -c /usr/l..." 8 hours ago Up 8 hours 0.0.0.0:5601->5601/tcp dockerelk_kibana_1 24b575c350b7 dockerelk_elasticsearch "/bin/bash bin/es-..." 8 hours ago Up 8 hours 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp dockerelk_elasticsearch_1
以下のように logstash にメッセージを投げることが出来る。
$ echo "hello" | nc localhost 5000
デモ
MacOS X 上で以下のように logstash にメッセージを投げる
$ echo $(date +%FT%T.$(gdate +%3N)Z) | nc localhost 5000
- logstash では以下のように出力される
$ docker-compose logs -f logstash ... logstash_1 | { logstash_1 | "@timestamp" => 2017-06-15T14:49:44.455Z, logstash_1 | "datetime_hour_min" => "23:49", logstash_1 | "port" => 58782, logstash_1 | "datetime_hour_min_sec" => "23:49:44", logstash_1 | "@version" => "1", logstash_1 | "host" => "172.18.0.1", logstash_1 | "message" => "2017-06-15T23:49:44.426Z" logstash_1 | }
datetime_hour_min
と datetime_hour_min_sec
というキーが追加され、意図した通りに時分秒、時分が値として設定されている。
event オブジェクトへのアクセス方法
Event API
logstash 5.0 で再実装された Event API に合わせて event オブジェクトへのアクセス方法が変更されている。
www.elastic.co
以下、ドキュメントの抜粋。
In 5.0, we’ve re-implemented the Event class and its supporting classes in pure Java. Since Event is a critical component in data processing, a rewrite in Java improves performance and provides efficient serialization when storing data on disk.
Pure Jave で書き換えることで、データ保存時のシリアライズのパフォーマンス向上を図った(と読める)ようだ。
GitHub issue
www.elastic.co
github.com
logstash 5.0 から event オブジェクトへのアクセス方法は従来の hash like なアクセス方法を廃止して、getter と setter でアクセスするように変更されたというのが概要。
5.x 以前
hash にアクセスするような書き方でアクセス出来る。
filter { ruby { code => " event['datetime_hour_min_sec'] = DateTime.parse(event['message']).strftime('%H:%M:%S'); event['datetime_hour_min'] = DateTime.parse(event['message']).strftime('%H:%M'); " } }
5.x 移行
GetAPI と SetAPI を利用してアクセスする。
filter { ruby { code => " event.set('datetime_hour_min_sec', DateTime.parse(event.get('message')).strftime('%H:%M:%S')); event.set('datetime_hour_min', DateTime.parse(event.get('message')).strftime('%H:%M')); " } }
影響範囲
きっと、ちゃんとフォローされている環境であれば、何てこと無いんだろうけど、うっかりバージョンアップしたり、ググった設定をそのまま利用しようとするとハマりそう(実際にハマった)。
その他の filter
mutate
Ruby filter を利用する前に導入を検討していた。
www.elastic.co
add_field
で任意のキーと値を追加したり…
filter { mutate { add_field => { "foo_%{somefield}" => "Hello world, from %{host}" } } }
split
で任意のキーの値を分割したり出来る。
filter { mutate { split => { "fieldname" => "," } } }
全てを網羅出来ていないが、ドキュメントを見る限りではかなり柔軟に mutate 出来る。
grok
logstash に適当に入ってきたレコードを一定のルールに従って構造化してくれるスゴイやつ(という認識)。
www.elastic.co
こちら の例を見ると解りやすいので、そのまま写経。
入ってくるログレコードは Web サーバーのアクセスログっぽい。
55.3.244.1 GET /index.html 15824 0.043
Grok フィルタを以下のように定義する。
filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } } }
出力は stdout プラグインの codec は rubydebug を利用して確認。
$ docker-compose logs -f logstash ... logstash_1 | { logstash_1 | "duration" => "0.043", logstash_1 | "request" => "/index.html", logstash_1 | "@timestamp" => 2017-06-16T23:49:13.404Z, logstash_1 | "method" => "GET", logstash_1 | "port" => 36390, logstash_1 | "bytes" => "15824", logstash_1 | "@version" => "1", logstash_1 | "host" => "172.18.0.1", logstash_1 | "client" => "55.3.244.1", logstash_1 | "message" => "55.3.244.1 GET /index.html 15824 0.043" logstash_1 | }
上記のように client
や method
等、よしなに解析してくれている。
以上
logstash 奥深い。