tl;dr

logstash を使っていて、filter でちょっと凝ったことをしたいと思って調べていたら Ruby のコードが直接書けるようなので導入してみた。


www.elastic.co

今回やりたかったこと

  • input で入ってきた event オブジェクトの中にある date 型フィールドデータのフォーマットを整形したデータを追加したい
  • 具体的には ISO8601 で入ってきた Datetime データを HH:MM:SSHH:MM に整形してそれぞれ個別のフィールドに追加したい

サンプル実装

logstash.conf

logstash 5.x とそれ以前のバージョンで書き方が若干異なるので注意。

  • logstash 5.x の場合
input {
  tcp {
    port => 5000
  }
}

filter {
  ruby {
    code => "
      event.set('datetime_hour_min_sec', DateTime.parse(event.get('message')).strftime('%H:%M:%S'));
      event.set('datetime_hour_min', DateTime.parse(event.get('message')).strftime('%H:%M'));
    "
  }
}

output {
  stdout {
    codec => "rubydebug"
  }
}
  • logstash 5.x 以前
input {
  tcp {
    port => 5000
  }
}

filter {
  ruby {
    code => "
      event['datetime_hour_min_sec'] = DateTime.parse(event['message']).strftime('%H:%M:%S');
      event['datetime_hour_min'] = DateTime.parse(event['message']).strftime('%H:%M');
    "
  }
}

output {
  stdout {
    codec => "rubydebug"
  }
}

詳細については、以下の注意点にて。

ELK Stack の用意

ちょっと脱線して Elasticsearch + Logstash + Kibana 環境については以下の docker-compose 用の YAML を利用した。

docker-elk - The ELK stack powered by Docker and Compose.

github.com

超簡単に ELK Stack を作成することが出来た。

$ git clone https://github.com/deviantony/docker-elk.git
$ cd docker-elk
$ docker-compose up -d
$ docker ps
CONTAINER ID        IMAGE                     COMMAND                  CREATED             STATUS              PORTS                                            NAMES
1ff0e05da5ab        dockerelk_logstash        "/usr/local/bin/do..."   15 minutes ago      Up 12 minutes       5044/tcp, 0.0.0.0:5000->5000/tcp, 9600/tcp       dockerelk_logstash_1
3099200a271e        dockerelk_kibana          "/bin/sh -c /usr/l..."   8 hours ago         Up 8 hours          0.0.0.0:5601->5601/tcp                           dockerelk_kibana_1
24b575c350b7        dockerelk_elasticsearch   "/bin/bash bin/es-..."   8 hours ago         Up 8 hours          0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp   dockerelk_elasticsearch_1

以下のように logstash にメッセージを投げることが出来る。

$ echo "hello" | nc localhost 5000

デモ

MacOS X 上で以下のように logstash にメッセージを投げる

$ echo $(date +%FT%T.$(gdate +%3N)Z) | nc localhost 5000
  • logstash では以下のように出力される
$ docker-compose logs -f logstash
...
logstash_1       | {
logstash_1       |                "@timestamp" => 2017-06-15T14:49:44.455Z,
logstash_1       |         "datetime_hour_min" => "23:49",
logstash_1       |                      "port" => 58782,
logstash_1       |     "datetime_hour_min_sec" => "23:49:44",
logstash_1       |                  "@version" => "1",
logstash_1       |                      "host" => "172.18.0.1",
logstash_1       |                   "message" => "2017-06-15T23:49:44.426Z"
logstash_1       | }

datetime_hour_mindatetime_hour_min_sec というキーが追加され、意図した通りに時分秒、時分が値として設定されている。

event オブジェクトへのアクセス方法

Event API

logstash 5.0 で再実装された Event API に合わせて event オブジェクトへのアクセス方法が変更されている。


www.elastic.co

以下、ドキュメントの抜粋。

In 5.0, we’ve re-implemented the Event class and its supporting classes in pure Java. Since Event is a critical component in data processing, a rewrite in Java improves performance and provides efficient serialization when storing data on disk.

Pure Jave で書き換えることで、データ保存時のシリアライズのパフォーマンス向上を図った(と読める)ようだ。

GitHub issue

This is a tracking issue related to the Event API refactor which will break compatibility for the 5.0 release.Historically it was assumed that the returned values from the Ruby Event hash-like int...

www.elastic.co

As per #5140 we have decided to remove Ruby hash-like API and expose new getter and setter in 5.0.This is a WIP proposal and is open for discussions. The identified undefined behaviours will proba...

github.com

logstash 5.0 から event オブジェクトへのアクセス方法は従来の hash like なアクセス方法を廃止して、getter と setter でアクセスするように変更されたというのが概要。

5.x 以前

hash にアクセスするような書き方でアクセス出来る。

filter {
  ruby {
    code => "
      event['datetime_hour_min_sec'] = DateTime.parse(event['message']).strftime('%H:%M:%S');
      event['datetime_hour_min'] = DateTime.parse(event['message']).strftime('%H:%M');
    "
  }
}

5.x 移行

GetAPI と SetAPI を利用してアクセスする。

filter {
  ruby {
    code => "
      event.set('datetime_hour_min_sec', DateTime.parse(event.get('message')).strftime('%H:%M:%S'));
      event.set('datetime_hour_min', DateTime.parse(event.get('message')).strftime('%H:%M'));
    "
  }
}

影響範囲

きっと、ちゃんとフォローされている環境であれば、何てこと無いんだろうけど、うっかりバージョンアップしたり、ググった設定をそのまま利用しようとするとハマりそう(実際にハマった)。

その他の filter

mutate

Ruby filter を利用する前に導入を検討していた。


www.elastic.co

add_field で任意のキーと値を追加したり…

filter {
  mutate {
    add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
  }
}

split で任意のキーの値を分割したり出来る。

filter {
  mutate {
     split => { "fieldname" => "," }
  }
}

全てを網羅出来ていないが、ドキュメントを見る限りではかなり柔軟に mutate 出来る。

grok

logstash に適当に入ってきたレコードを一定のルールに従って構造化してくれるスゴイやつ(という認識)。


www.elastic.co

こちら の例を見ると解りやすいので、そのまま写経。
入ってくるログレコードは Web サーバーのアクセスログっぽい。

55.3.244.1 GET /index.html 15824 0.043

Grok フィルタを以下のように定義する。

filter {
  grok {
    match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
  }
}

出力は stdout プラグインの codec は rubydebug を利用して確認。

$ docker-compose logs -f logstash
...
logstash_1       | {
logstash_1       |       "duration" => "0.043",
logstash_1       |        "request" => "/index.html",
logstash_1       |     "@timestamp" => 2017-06-16T23:49:13.404Z,
logstash_1       |         "method" => "GET",
logstash_1       |           "port" => 36390,
logstash_1       |          "bytes" => "15824",
logstash_1       |       "@version" => "1",
logstash_1       |           "host" => "172.18.0.1",
logstash_1       |         "client" => "55.3.244.1",
logstash_1       |        "message" => "55.3.244.1 GET /index.html 15824 0.043"
logstash_1       | }

上記のように clientmethod 等、よしなに解析してくれている。

以上

logstash 奥深い。

元記事はこちら

小ネタ道場一本勝負 〜 logstash の Ruby filter メモ 〜