遅延していますが…これは「初老丸の独り Advent calendar 2015」の十三日目の記事です。

tl;dr

先日の勉強会で Amazon ES に Web 上から取得したデータを解析して Amazon ES に放り込むデモをやったら Amazon ES にデータを放り込む前にタイムオーバーしてしまって残念だったのでメモ。

原因と対策

原因

原因というか大量のデータを放り込む時には Bulk Insert を利用する方が良さそう。

対策

上記の通り、Bulk API を使うように修正する。今回は Ruby の elasticsearch-ruby を利用する。

github.com

elasticsearch-ruby で Bulk API を利用する場合には bulk メソッドを利用する。

def bulk_post(es_endpoint, data)
  c = Elasticsearch::Client.new log: true, url: es_endpoint
  c.bulk(body: data)
end

data は配列で指定する。

ということで、以下のような感じで soramame-es.rb を書き換えた。

github.com

def processing_soramame_content(uri, check_date_time, check_time)
  index_date = check_date_time.split(' ')
  html = NKF.nkf("--utf8", open(uri).read)
  # Parse content and post to Elasticsearch
  header = ['CHECK_DATE_TIME','CHECK_TIME','mon_st_code','town_name', 'mon_st_name', 'SO2','NO','NO2','NOX','CO','OX','NMHC','CH4','THC','SPM','PM2_5','SP','WD','WS','TEMP','HUM','mon_st_kind']
  doc = Nokogiri::HTML.parse(html, nil, nil)
  bulk = []
  num = 1
  doc.xpath('//tr[td]').each do |tr|
    row = tr.xpath('td').map { |td| td.content.gsub(/[u00A0n]|---/,'NA') }
    row.unshift(check_time)
    row.unshift(check_date_time)
    ary = [header,row].transpose
    h = Hash[*ary.flatten]
    record_index = {
      'index' => {
        '_index' => "pm25_test_" + $d.strftime("%Y-%m-%d"),
        '_type' => 'kyushu',
      }
    }
    record_data = {
      'CHECK_DATE_TIME' => h['CHECK_DATE_TIME'],
      'CHECK_TIME' => h['CHECK_TIME'],
      'town_name' => h['town_name'],
      'mon_st_name' => h['mon_st_name'],
      'PM2_5' => h['PM2_5'].include?('NA') ? h['PM2_5'] = '' : h['PM2_5'],
      'TEMP' => h['TEMP'].include?('NA') ? h['TEMP'] = '' : h['TEMP']
    }
    bulk << record_index
    bulk << record_data
  end
  bulk_post($url, bulk)
end

上記のようにすることでbulkの中身は以下となり Bulk API で放り込まれる。

(snip)

{"index":{"_index":"pm25_test_2015-12-13","_type":"kyushu"}}
{"CHECK_DATE_TIME":"2015-12-13 01:00:00","CHECK_TIME":"01","town_name":"志布志市","mon_st_name":"志布志","PM2_5":"","TEMP":""}
{"index":{"_index":"pm25_test_2015-12-13","_type":"kyushu"}}
{"CHECK_DATE_TIME":"2015-12-13 01:00:00","CHECK_TIME":"01","town_name":"肝属郡東串良町","mon_st_name":"東串良","PM2_5":"","TEMP":""}

比較

Bulk API を利用した場合と通常のドキュメント追加を利用した場合の速度をザクっと比較。

  • レコード数は 205 レコード
  • ファイルに書き出すと 36KB 弱

以下のような結果となった。

  • Elasticsearch で比較
#
# Bulk API
#
$ time ruby soramame-es.rb hb201512130108.html

(snip)

real    0m0.349s
user    0m0.224s
sys     0m0.063s

#
# 通常のドキュメント追加
#
$ time ruby soramame-es.rb hb201512130108.html

(snip)

real    0m0.643s
user    0m0.309s
sys     0m0.085s
  • Amazon ES で比較(インスタンスサイズ:t2.micro.elasticsearch / ストレージ:EBS(Magnetic))
#
# Bulk API
#
$ time ruby soramame-es.rb hb201512130108.html

(snip)

real    0m0.981s
user    0m0.254s
sys     0m0.051s

#
# 通常のドキュメント追加
#
$ time ruby soramame-es.rb hb201512130108.html

(snip)

real    0m44.970s
user    0m0.416s
sys     0m0.635s

Bulk API を使う方が速い。Amazon ES の場合がその差が顕著。Bulk API は速い分だけトレードオフはきっとあるだろうから引き続き調べる。

ということで…

How big is Too big?

ドキュメントより抜粋して意訳。

The entire bulk request needs to be loaded into memory by the node that receives our request, so the bigger the request, the less memory available for other requests. There is an optimal size of bulk request. Above that size, performance no longer improves and may even drop off. The optimal size, however, is not a fixed number. It depends entirely on your hardware, your document size and complexity, and your indexing and search load.

  • Bulk リクエスト(Bulk API)は Elasticsearch ノードのメモリにロードして処理する
  • ということは Elasticsearch ノードのメモリサイズによって Bulk API の性能は変化する
  • Bulk API のリクエストサイズによってはパフォーマンスが劣化する可能性がある
  • Bulk API のリクエストの最適値は Elasticsearch ノードのハードウェア性能、リクエストのドキュメントサイズや内容等によって変わる

なるほど、なるほど。

Amazon ES に対する要望

  • マネジメントコンソールからインデックスを追加、削除出来るとちょっとうれしい(API からでもそれほど難しくないけど)

以上

取り急ぎ、メモでした。(後ほど Bulk API についてもう少し書く)

元記事はこちら

(ショロカレ 13 日目)Elasticsearch 及び Amazon ES で Bulk API を試すメモ