5

私はpy-elasticsearchバルク@Diolorソリューションが機能することについて混乱してい ます-pythonですが、プレーンな es.bulk() を使用したい

私のコード:

from elasticsearch import Elasticsearch
es = Elasticsearch()
doc = '''\n {"host":"logsqa","path":"/logs","message":"test test","@timestamp":"2014-10-02T10:11:25.980256","tags":["multiline","mydate_0.005"]} \n'''
result = es.bulk(index="logstash-test", doc_type="test", body=doc)

エラーは次のとおりです。

 No handlers could be found for logger "elasticsearch"
Traceback (most recent call last):
  File "./log-parser-perf.py", line 55, in <module>
    insertToES()
  File "./log-parser-perf.py", line 46, in insertToES
    res = es.bulk(index="logstash-test", doc_type="test", body=doc)
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch-1.0.0-py2.7.egg/elasticsearch/client/utils.py", line 70, in _wrapped
    return func(*args, params=params, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch-1.0.0-py2.7.egg/elasticsearch/client/__init__.py", line 570, in bulk
    params=params, body=self._bulk_body(body))
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch-1.0.0-py2.7.egg/elasticsearch/transport.py", line 274, in perform_request
    status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore)
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch-1.0.0-py2.7.egg/elasticsearch/connection/http_urllib3.py", line 57, in perform_request
    self._raise_error(response.status, raw_data)
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch-1.0.0-py2.7.egg/elasticsearch/connection/base.py", line 83, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
elasticsearch.exceptions.TransportError: TransportError(500, u'ActionRequestValidationException[Validation Failed: 1: no requests added;]')

POST 呼び出し用に生成された URL は

/logstash-test/test/_bulk

POST 本文は次のとおりです。

{"host":"logsqa","path":"/logs","message":"test test","@timestamp":"2014-10-02T10:11:25.980256","tags":["複数行","mydate_0.005"]}

だから私は手でカールをしました:このカールはうまくいきません:

> curl -XPUT http://localhost:9200/logstash-test/test2/_bulk -d
> '{"host":"logsqa","path":"/logs","message":"test
> test","@timestamp":"2014-10-02T10:11:25.980256","tags":["multiline","mydate_0.005"]}
> '
>
> {"error":"ActionRequestValidationException[Validation Failed: 1: no requests added;]","status":500}

したがって、エラーは部分的に問題ありませんが、elasticsearch.bulk() が入力引数を適切に管理することを期待していました。

pythonf 関数は次のとおりです。

bulk(*args, **kwargs)
    :arg body: The operation definition and data (action-data pairs), as
        either a newline separated string, or a sequence of dicts to
        serialize (one per row).
    :arg index: Default index for items which don't provide one
    :arg doc_type: Default document type for items which don't provide one
        :arg consistency: Explicit write consistency setting for the operation
    :arg refresh: Refresh the index after performing the operation
    :arg routing: Specific routing value
    :arg replication: Explicitly set the replication type (default: sync)
    :arg timeout: Explicit operation timeout
4

2 に答える 2

7

誰かが現在バルク API を使用しようとしていて、どのような形式にすべきか疑問に思っている場合、私にとっては次のように機能しました。

doc = [
    {
        'index':{
            '_index': index_name,
            '_id' : <some_id>,
            '_type':<doc_type>
        }
    },
    {
        'field_1': <value>,
        'field_2': <value>
    }
]

docs_as_string = json.dumps(doc[0]) + '\n' + json.dumps(doc[1]) + '\n'
client.bulk(body=docs_as_string)
于 2016-05-05T20:26:08.667 に答える
1

github の @HonzaKral から

https://github.com/elasticsearch/elasticsearch-py/issues/135

シルクバックスさん、こんにちは。

バルク API (他のすべての API と同様) は、elasticsearch 自体のバルク API 形式に非常に厳密に従うため、本文は次のようにする必要があります。

doc = '''{"index": {}}\n{"host":"logsqa","path":"/logs","message":"test test","@timestamp":"2014- 10-02T10:11:25.980256","tags":["multiline","mydate_0.005"]}\n''' 動作するようにします。あるいは、それらの 2 つの dict のリストにすることもできます。

これは、python から操作するには複雑で扱いにくい形式です。そのため、elasticsearch.helpers.bulk (0) でバルクを操作するためのより便利な方法を作成しようとしました。ドキュメントのイテレータを受け入れるだけで、そこからオプションのメタデータ (_id、_type など) を抽出し、一括リクエストを構築 (および実行) します。受け入れられる形式の詳細については、上記の streaming_bulk のドキュメントを参照してください。これは、ストリームを反復的に処理するためのヘルパーです (ユーザーの視点から一度に 1 つずつ、バックグラウンドでチャンクにバッチ処理されます)。

お役に立てれば。

0 - http://elasticsearch-py.readthedocs.org/en/master/helpers.html#elasticsearch.helpers.bulk

于 2014-10-03T05:41:37.950 に答える