マルチプロセッシングスクリプトでelasticsearch-pyを使用する正しい方法は何ですか? プロセスを開始する前に新しいクライアント オブジェクトを作成してそのオブジェクトを使用するか、各プロセス内に新しいオブジェクトを作成する必要があります。2 つ目は、elasticsearch からの接続の問題でエラーが発生します
ありがとうキラン
マルチプロセッシングスクリプトでelasticsearch-pyを使用する正しい方法は何ですか? プロセスを開始する前に新しいクライアント オブジェクトを作成してそのオブジェクトを使用するか、各プロセス内に新しいオブジェクトを作成する必要があります。2 つ目は、elasticsearch からの接続の問題でエラーが発生します
ありがとうキラン
クライアントオブジェクトをグローバル変数として宣言すると、最初の方法がうまくいくようです。
from multiprocessing import Pool
from elasticsearch import Elasticsearch
import time
def task(body):
result = es.index(index='test', doc_type='test', body=body)
return result
def main():
pool = Pool(processes=MAX_CONNECTS)
result = []
for x in range(10):
result.append(pool.apply_async(task, ({'id': x},)))
time.sleep(1)
for rs in result:
print(rs.get())
if __name__ == "__main__":
MAX_CONNECTS = 5
es = Elasticsearch(hosts="localhost", maxsize=MAX_CONNECTS)
main()
出力は次のようになります
{'_index': 'test', '_type': 'test', '_id': 'xEjqBWcB9xsUYKqz-P6U', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'w0jqBWcB9xsUYKqz-P6U', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'x0jqBWcB9xsUYKqz-P6X', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 4, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'xkjqBWcB9xsUYKqz-P6X', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 3, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'xUjqBWcB9xsUYKqz-P6W', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 2, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'yEjqBWcB9xsUYKqz-P66', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 4, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'ykjqBWcB9xsUYKqz-P7I', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 2, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'yUjqBWcB9xsUYKqz-P7I', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 3, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'y0jqBWcB9xsUYKqz-P7P', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 4, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'zEjqBWcB9xsUYKqz-P7V', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 5, '_primary_term': 1}
推奨される方法は、一意のクライアント オブジェクトを作成することです。また、を使用して同時スレッドの数を増やすことができますmaxsize
(既定では 10)。
es = Elasticsearch( "host1", maxsize=25)