77

私のpythonプログラムと統合されたElasticSearchにメッセージを保存する必要があります。今、メッセージを保存しようとしているのは次のとおりです。

d={"message":"this is message"}
    for index_nr in range(1,5):
        ElasticSearchAPI.addToIndex(index_nr, d)
        print d

つまり、10 個のメッセージがある場合、コードを 10 回繰り返す必要があります。そこで、スクリプト ファイルまたはバッチ ファイルを作成してみます。ElasticSearch Guideを確認したところ、BULK APIが利用可能です。フォーマットは次のようになります。

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }

私がしたことは:

{"index":{"_index":"test1","_type":"message","_id":"1"}}
{"message":"it is red"}
{"index":{"_index":"test2","_type":"message","_id":"2"}}
{"message":"it is green"}

また、curl ツールを使用してドキュメントを保存します。

$ curl -s -XPOST localhost:9200/_bulk --data-binary @message.json

次に、Python コードを使用してファイルを Elastic Search に保存します。

4

5 に答える 5

144
from datetime import datetime

from elasticsearch import Elasticsearch
from elasticsearch import helpers

es = Elasticsearch()

actions = [
  {
    "_index": "tickets-index",
    "_type": "tickets",
    "_id": j,
    "_source": {
        "any":"data" + str(j),
        "timestamp": datetime.now()}
  }
  for j in range(0, 10)
]

helpers.bulk(es, actions)
于 2014-01-14T12:58:58.700 に答える
51

@justinachen のコードは py-elasticsearch を使い始めるのに役立ちましたが、ソース コードを調べた後、簡単な改善を行うことができました。

es = Elasticsearch()
j = 0
actions = []
while (j <= 10):
    action = {
        "_index": "tickets-index",
        "_type": "tickets",
        "_id": j,
        "_source": {
            "any":"data" + str(j),
            "timestamp": datetime.now()
            }
        }
    actions.append(action)
    j += 1

helpers.bulk(es, actions)

helpers.bulk()すでにセグメンテーションを行っています。そして、セグメンテーションとは、毎回サーバーに送信されるチャックを意味します。送信されるドキュメントのチャンクを減らしたい場合は、次のようにします。helpers.bulk(es, actions, chunk_size=100)

開始するための便利な情報:

helpers.bulk()の単なるラッパーですhelpers.streaming_bulkが、最初のものは便利なリストを受け入れます。

helpers.streaming_bulkに基づいているElasticsearch.bulk()ため、何を選択するかについて心配する必要はありません。

したがって、ほとんどの場合、helpers.bulk() だけで十分です。

于 2014-04-16T19:30:16.267 に答える
41

(このスレッドで言及されている他のアプローチでは、ES の更新に python list を使用していますが、これは現在、特に何百万ものデータを ES に追加する必要がある場合には適切な解決策ではありません)

より良いアプローチは、Python ジェネレーターを使用することです。つまり、メモリを使い果たしたり、速度を大幅に低下させたりすることなく、大量のデータを処理します

以下は、実用的なユースケースのスニペットの例です - 分析のためにnginxログファイルからESにデータを追加します。

def decode_nginx_log(_nginx_fd):
    for each_line in _nginx_fd:
        # Filter out the below from each log line
        remote_addr = ...
        timestamp   = ...
        ...

        # Index for elasticsearch. Typically timestamp.
        idx = ...

        es_fields_keys = ('remote_addr', 'timestamp', 'url', 'status')
        es_fields_vals = (remote_addr, timestamp, url, status)

        # We return a dict holding values from each line
        es_nginx_d = dict(zip(es_fields_keys, es_fields_vals))

        # Return the row on each iteration
        yield idx, es_nginx_d   # <- Note the usage of 'yield'

def es_add_bulk(nginx_file):
    # The nginx file can be gzip or just text. Open it appropriately.
    ...

    es = Elasticsearch(hosts = [{'host': 'localhost', 'port': 9200}])

    # NOTE the (...) round brackets. This is for a generator.
    k = ({
            "_index": "nginx",
            "_type" : "logs",
            "_id"   : idx,
            "_source": es_nginx_d,
         } for idx, es_nginx_d in decode_nginx_log(_nginx_fd))

    helpers.bulk(es, k)

# Now, just run it.
es_add_bulk('./nginx.1.log.gz')

このスケルトンは、ジェネレーターの使用法を示しています。必要に応じて、ベア マシンでも使用できます。そして、これを拡張して、ニーズにすばやく合わせることができます。

Python Elasticsearch リファレンスはこちら

于 2016-08-21T21:36:02.390 に答える