検証の為に

大量データを Amazon Elasticsearch Service に放り込みたいです。
データは

datasets - 研究用データセットです。

github.com

こちらを利用させて頂きます。
また、

以下でデータを入れる方法が書いてないので、入れるためのメモです。http://code46.hatenablog.com/entry/2014/01/21/115620データセットはこちらにあります。* http://b...

qiita.com

上記の記事を参考にさせて頂きました。ありがとうございます。

放り込む

放り込む前に csv から json データにする

上記の記事を参考にさせて頂いて Python で書いてみました。標準入力から csv を読み込もうと試行錯誤しましたが、とりあえず csv ファイルを読み込む体でいきます。

# -*- coding: utf-8 -*-

import csv
import json
import random
import sys

INDEX = "ldgourmet"
TYPE  = "restaurant"

#
# http://racchai.hatenablog.com/entry/2016/04/16/173000 を参考にさせて頂きました.
# ありがとうございます.
#
def random_string(length, seq='0123456789abcdefghijklmnopqrstuvwxyz'):
    sr = random.SystemRandom()
    return ''.join([sr.choice(seq) for i in xrange(length)])

with open('restaurants.csv', 'r') as f:
    reader = csv.reader(f)
    header = next(reader)

    for row in reader:
        # index = { 'index': { '_index': INDEX, '_type': TYPE, '_id': random_string(16) }} # bulk API だと Index を付けなきゃいけないけど...
        # print json.dumps(index)
        print json.dumps(dict(zip(header, row)))

レストランデータを展開して、上記のスクリプトを以下のように実行します。

$ tar zxvf ldgourmet.tar.gz
$ ls -l restaurants.csv
-rw-r--r-- 1 cloudpack cloudpack 59311909 Apr 22  2011 restaurants.csv
$ python csv2json.py > restaurants.json

インデックスの作成

レストランデータを放り込むインデックスを作成する。
ひとまず放り込むだけなので、マッピングは超シンプルに。

$ cat mapping.json
{
  "mappings": {
    "restaurant": {
      "properties": {
        "restaurant_id": {
          "type": "integer"
        },
        "name": {
          "type": "string",
        },
        "name_alphabet": {
          "type": "string",
        },
        "name_kana": {
          "type": "string",
        },
        "address": {
          "type": "string",
        },
        "description": {
          "type": "string",
        },
        "purpose": {
          "type": "string",
        },
        "category": {
          "type": "string",
          "analyzer": "whitespace"
        },
        "photo_count": {
          "type": "integer"
        },
        "menu_count": {
          "type": "integer"
        },
        "access_count": {
          "type": "integer"
        },
        "closed": {
          "type": "boolean"
        },
        "location": {
          "type": "geo_point",
          "store": "yes"
        }
      }
    }
  }
}

以下のように実行する。

#
# インデックスを作成
#
_ESS_ENDPOINT=${Amazon Elasticsearch Service Endpoint}
_INDEX='ldgourmet'
_TARGET=${_ESS_ENDPOINT}/${_INDEX}
curl -XPUT ${_TARGET} -d @mapping.json

ここで真打ち stream2es の登場

stream2es については @johtani さんの以下の記事がバイブルです。

kopfの記事の続きも書く必要があるんだけど、こんなツイートを見つけてしまったので。。。 ElasticsearchのBulk APIの仕様、JSONファイルをいい感じに加工して置かなければならないしハマりどころ多い。 http://t.co/hmfycqZlqk— Kenta …

blog.johtani.info

stream2es を動かすには Java8 以上が必要ですので、事前に Java8 をインストールしておきましょう。

$ java -version
openjdk version "1.8.0_121"
OpenJDK Runtime Environment (build 1.8.0_121-b13)
OpenJDK 64-Bit Server VM (build 25.121-b13, mixed mode)

あとは、以下のようにダウンロードして実行権限を付与した後にデータの投入を行います。

#
# stream2es を取得
#
curl -O download.elasticsearch.org/stream2es/stream2es; chmod +x stream2es

#
# データの投入
#
_ESS_ENDPOINT=${Amazon Elasticsearch Service Endpoint}
_INDEX='ldgourmet'
_TYPE='restaurant'
_TARGET=${_ESS_ENDPOINT}/${_INDEX}/${_TYPE}
cat restaurants.json | ./stream2es stdin --target ${_TARGET}

実行すると以下のように出力されます。

$ cat restaurants.json | ./stream2es stdin --target ${_TARGET}
2017-03-31T08:56:18.797+0000 INFO  01:36.489 2220.3d/s 1848.3K/s (174.2mb) indexed 214236 streamed 214236 errors 0
2017-03-31T08:56:18.823+0000 INFO  done

ひとまず

ちゃんと放り込めたかを Count API を使って確認します。

_ESS_ENDPOINT=${Amazon Elasticsearch Service Endpoint}
_INDEX='/ldgourmet/restaurant'
_TARGET=${_ESS_ENDPOINT}${_INDEX}
$ curl -s -XGET ${_TARGET}/_count | jq .
{
  "count": 214236,
  "_shards": {
    "total": 2,
    "successful": 2,
    "failed": 0
  }
}

問題なさそうです。
これで検証を進められそうです。

以上

stream2es を使えば

bulk API を駆使しなくても(bulk API を利用する為にデータを整形しなくても)大量データをサクッと放り込んでくれました。また、各ドキュメントの ID についても自動で採番してくれるのも嬉しい限りです。

stream2es ヘルプ

単に標準入力から Elasticsearch にバルクでデータを放り込んでくれるだけでは無さそうです。

$ ./stream2es --help
2017-03-31T12:22:27.534+0000 INFO  Copyright 2013 Elasticsearch

Usage: stream2es [CMD] [OPTS]

Available commands: wiki, twitter, stdin, es

Common opts:
   --authinfo   Stored stream credentials (default: "/home/cloudpack/.authinfo.stream2es")
--clobber       Use elasticsearch 'index' operation, clobbering existing documents, no-clobber uses 'create' which will skip/error existing documents (default: false)
-h --help       Display help (default: false)
--http-insecure Don't verify peer cert (default: false)
   --http-keystore /path/to/keystore (default: null)
   --http-keystore-pass Keystore password (default: null)
   --http-trust-store /path/to/keystore (default: null)
   --http-trust-store-pass Truststore password (default: null)
--indexing      Whether to actually send data to ES (default: true)
   --log        Log level (trace debug info warn error fatal report) (default: "info")
   --mappings   Index mappings (default: null)
-d --max-docs   Number of docs to index (default: -1)
--offset        Add __s2e_offset__ field TO EACH DOCUMENT with the sequence offset of the stream (default: false)
-q --queue-size Size of the internal bulk queue (default: 40)
--replace       Delete index before streaming (default: false)
   --settings   Index settings (default: null)
-s --skip       Skip this many docs before indexing (default: 0)
   --stream-buffer Buffer up to this many pages (default: 50)
   --stream-timeout Wait seconds for data on the stream (default: -1)
   --target     ES location (default: "http://localhost:9200")
   --tee        Save json request payloads as files in path (default: null)
   --tee-bulk   Save bulk request payloads as files in path (default: null)
--tee-errors    Create error-{id} files (default: true)
-v --version    Print version (default: false)
-w --workers    Number of indexing threads (default: 2)

ElasticsearchStream opts:
-b --bulk-bytes Bulk size in bytes (default: 1048576)
   --query      Query to _scan from source (default: "{"query":{"match_all":{}}}")
-q --queue-size Size of the internal bulk queue (default: 1000)
   --scroll-size Source scroll size (default: 500)
   --scroll-time Source scroll context TTL (default: "60s")
   --source     Source ES url (default: null)
--source-http-insecure Don't verify peer cert (default: false)
   --source-http-keystore /path/to/keystore (default: null)
   --source-http-keystore-pass Keystore password (default: null)
   --source-http-trust-store /path/to/keystore (default: null)
   --source-http-trust-store-pass Truststore password (default: null)
   --target     Target ES url (default: null)

GeneratorStream opts:
-b --bulk-bytes Bulk size in bytes (default: 102400)
   --dictionary Dictionary location (default: "/usr/share/dict/words")
   --fields     Field template (str, string, dbl, double, int, integer) (default: "f1:str:1")
-q --queue-size Size of the internal bulk queue (default: 40)
   --stream-buffer Buffer up to this many docs (default: 100000)
   --target     ES index (default: "http://localhost:9200/foo/t")

StdinStream opts:
-b --bulk-bytes Bulk size in bytes (default: 102400)
-q --queue-size Size of the internal bulk queue (default: 40)
   --stream-buffer Buffer up to this many docs (default: 100)
   --target     Target ES http://host:port/index/type (default: "http://localhost:9200/foo/t")

TwitterStream opts:
--authorize     Create oauth credentials (default: false)
-b --bulk-bytes Bulk size in bytes (default: 102400)
   --key        Twitter app consumer key, only for --authorize (default: null)
-q --queue-size Size of the internal bulk queue (default: 1000)
   --secret     Twitter app consumer secret, only for --authorize (default: null)
   --stream-buffer Buffer up to this many tweets (default: 1000)
   --target     Target ES http://host:port/index/type (default: "http://localhost:9200/twitter/status")
   --track      %%-separated list of strings to filter the stream (default: null)

WikiStream opts:
-b --bulk-bytes Bulk size in bytes (default: 3145728)
-q --queue-size Size of the internal bulk queue (default: 40)
   --source     Wiki dump location (default: "https://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2")
   --stream-buffer Buffer up to this many pages (default: 50)
   --target     Target ES http://host:port/index (we handle types here) (default: "http://localhost:9200/wiki")

元記事はこちら

stream2es で大量データをサクッと Amazon Elasticsearch Service に放り込む 〜 もう bulk API を操作しなくても良いかもしれない 〜