3

アクティブな Amazon Elasticsearch インスタンスがあり、Chrome から「Sense」を介してステートメントに接続して実行できます。しかし、一括挿入しようとすると、「タイムアウト」エラーが表示されます。Python (一括ヘルパー) と logstash モジュールの両方を試してみましたが、両方の方法で同じエラーが発生しました。

以下は使用したコードです

import psycopg2
from elasticsearch import Elasticsearch, helpers
import time

connection = psycopg2.connect(database='dbname', user='username', password='password', host='abc.def.com', port=5432)
es = Elasticsearch('elasticsearchinstance.amazonaws.com', max_retries=3, retry_on_timeout=True, request_timeout='10m')
cursor = connection.cursor()

query = """
select column1,column2,column3 from table
"""
cursor.execute(query)
rows = cursor.fetchall()
dict_list = []
for i in range(len(rows)):
    dict_list.append({'_type':'doc', '_index':'es_index', '_id':rows[i][0], 'column2':rows[i][1], 'column3':rows[i][2]})

print len(dict_list)

es.indices.delete(index='es_index', ignore=[400, 404])

time.sleep(2)

mapping = "{\"settings\" : {\"analysis\" : { \"analyzer\" : { \"my_ngram_analyzer\" : { \"tokenizer\" : \"my_ngram_tokenizer\" }},\"tokenizer\" : {\"my_ngram_tokenizer\" : {\"type\" : \"nGram\" , \"min_gram\" : \"2\" , \"max_gram\" : \"50\" }}}}, \"mappings\": { \"doc\": { \"_id\" : { \"path\" : \"id\" }, \"properties\": { \"column2\": { \"type\": \"string\", \"analyzer\": \"my_ngram_analyzer\" }, \"id\": { \"type\": \"long\" }, \"column3\": { \"type\": \"integer\" }}}}}"
es.indices.create(index='es_index', ignore=400, body=mapping)

helpers.bulk(es, dict_list)

Python Bulk ヘルパーで取得したエラーは次のとおりです。

Traceback (most recent call last):
File "D:\Python\refresh_data.py", line 21, in <module>
es.indices.delete(index='es_index', ignore=[400, 404])
File "C:\Python27\lib\site-packages\elasticsearch\client\utils.py", line 69, in _wrapped
return func(*args, params=params, **kwargs)
File "C:\Python27\lib\site-packages\elasticsearch\client\indices.py", line 198, in delete
params=params)
File "C:\Python27\lib\site-packages\elasticsearch\transport.py", line 307, in perform_request
status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
File "C:\Python27\lib\site-packages\elasticsearch\connection\http_urllib3.py", line 89, in perform_request
raise ConnectionError('N/A', str(e), e)

elasticsearch.exceptions.ConnectionError:
ConnectionError((<urllib3.connection.HTTPConnection object at 0x0000000002C91898>, u'Connection to elasticsearchinstance.amazonaws.com timed out. (connect timeout=10)')) 
caused by:
ConnectTimeoutError((<urllib3.connection.HTTPConnection object at 0x0000000002C91898>, u'Connection to elasticsearchinstance.amazonaws.com timed out. (connect timeout=10)'))

Logstash(一括挿入用)でも同様のタイムアウト エラーが発生します(必要に応じて、logstash のエラーを編集および更新します)。

Amazon Elasticsearch Service でこのタイムアウトの問題を解決するための支援が必要です。

前もって感謝します。

編集:

これは、Amazon ES への一括挿入を実行したときに「Logstash」で発生するエラーです。

C:\logstash-1.5.4\bin>logstash agent -f feed_load_amazon_es.conf
io/console not supported; tty will not be manipulated
←[31mFailed to install template: connect timed out {:level=>:error}←[0m
Logstash startup completed
←[31mGot error to send bulk of actions: connect timed out {:level=>:error}←[0m
←[33mFailed to flush outgoing items {:outgoing_count=>3, :exception=>"Manticore::ConnectTimeout", 
:backtrace=>["C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:35:in `initialize'", 
"org/jruby/RubyProc.java:271:in `call'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:70:in `call'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:245:in `call_once'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:148:in `code'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/transport/http/manticore.rb:71:in `perform_request'", 
"org/jruby/RubyProc.java:271:in `call'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/transport/base.rb:190:in `perform_request'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/transport/http/manticore.rb:54:in `perform_request'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/client.rb:119:in `perform_request'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-api-1.0.12/lib/elasticsearch/api/actions/bulk.rb:80:in `bulk'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch/protocol.rb:104:in `bulk'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch.rb:542:in `submit'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch.rb:566:in `flush'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/stud-0.0.21/lib/stud/buffer.rb:219:in `buffer_flush'", 
"org/jruby/RubyHash.java:1341:in `each'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/stud-0.0.21/lib/stud/buffer.rb:216:in `buffer_flush'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch.rb:600:in `teardown'", 
"org/jruby/RubyArray.java:1613:in `each'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:248:in `outputworker'", 
"org/jruby/RubyArray.java:1613:in `each'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:247:in `outputworker'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:166:in `start_outputs'"], :level=>:warn}←[0m
4

1 に答える 1

0

あなたはそれを間違っていると思います。

一括リクエストは、一括メソッドの「本文」フィールドの 2 行の組み合わせです。

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }

これがあなたの体のフィールドにあるべきものです。

最初の行には、リクエストのタイプ、バルクするインデックス、および設定できるかどうかの他の多くのパラメータが含まれています(ドキュメントを確認してください)。最初の行の最後に \r\n を追加します。

2 行目には、挿入しようとしているものが含まれている必要があります。

dict_list に何を入れているかを確認すると、index メソッドの呼び出しを忘れています。

間違った構造:

dict_list.append({'_type':'doc', '_index':'es_index', '_id':rows[i][0], 'column2':rows[i][1], 'column3':rows[i][2]})

右の構造 :

{ "index" : {'_type':'doc', '_index':'es_index', '_id':rows[i][0]} }

次に、ドキュメントを 2 行目に追加します。

于 2015-12-07T10:25:35.000 に答える