4

マルチプロセッシングスクリプトでelasticsearch-pyを使用する正しい方法は何ですか? プロセスを開始する前に新しいクライアント オブジェクトを作成してそのオブジェクトを使用するか、各プロセス内に新しいオブジェクトを作成する必要があります。2 つ目は、elasticsearch からの接続の問題でエラーが発生します

ありがとうキラン

4

2 に答える 2

3

クライアントオブジェクトをグローバル変数として宣言すると、最初の方法がうまくいくようです。

from multiprocessing import Pool
from elasticsearch import Elasticsearch
import time


def task(body):
    result = es.index(index='test', doc_type='test', body=body)
    return result


def main():
    pool = Pool(processes=MAX_CONNECTS)
    result = []
    for x in range(10):
        result.append(pool.apply_async(task, ({'id': x},)))
    time.sleep(1)
    for rs in result:
        print(rs.get())


if __name__ == "__main__":
    MAX_CONNECTS = 5
    es = Elasticsearch(hosts="localhost", maxsize=MAX_CONNECTS)
    main()

出力は次のようになります

{'_index': 'test', '_type': 'test', '_id': 'xEjqBWcB9xsUYKqz-P6U', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'w0jqBWcB9xsUYKqz-P6U', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'x0jqBWcB9xsUYKqz-P6X', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 4, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'xkjqBWcB9xsUYKqz-P6X', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 3, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'xUjqBWcB9xsUYKqz-P6W', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 2, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'yEjqBWcB9xsUYKqz-P66', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 4, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'ykjqBWcB9xsUYKqz-P7I', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 2, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'yUjqBWcB9xsUYKqz-P7I', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 3, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'y0jqBWcB9xsUYKqz-P7P', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 4, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'zEjqBWcB9xsUYKqz-P7V', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 5, '_primary_term': 1}
于 2018-11-12T03:21:41.113 に答える
0

推奨される方法は、一意のクライアント オブジェクトを作成することです。また、を使用して同時スレッドの数を増やすことができますmaxsize(既定では 10)。

es = Elasticsearch( "host1", maxsize=25)

ソース

于 2016-12-28T14:41:45.233 に答える