3

バージョン の Elastic Cloud の Elastic Search にペア rdd を書き込もうとしてい2.4.0ます。プラグインを使用elasticsearch-spark_2.10-2.4.0して ES に書き込みます。ESへの書き込みに使用しているコードは次のとおりです。

def predict_imgs(r):  
  import json
  out_d = {}
  out_d["pid"] = r["pid"]
  out_d["other_stuff"] = r["other_stuff"]

  return (r["pid"], json.dumps(out_d))

res2 = res1.map(predict_imgs)

es_write_conf = {
"es.nodes" : image_es,
#"es.port" : "9243",
"es.resource" : "index/type",
"es.nodes.wan.only":"True",
"es.write.operation":"upsert",
"es.mapping.id":"product_id",
"es.nodes.discovery" : "false",
"es.net.http.auth.user": "username",
"es.net.http.auth.pass": "pass",
"es.input.json": "true",
"es.http.timeout":"1m",
"es.scroll.size":"10",
"es.batch.size.bytes":"1mb",
"es.http.retries":"1",
"es.batch.size.entries":"5",
"es.batch.write.refresh":"False",
"es.batch.write.retry.count":"1",
"es.batch.write.retry.wait":"10s"}

res2.saveAsNewAPIHadoopFile(
path='-', 
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable", 
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
conf=es_write_conf)

私が得るエラーは次のとおりです。

Py4JJavaError: An error occurred while calling     z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 744 in stage 26.0 failed 4 times, most recent failure: Lost task 744.3 in stage 26.0 (TID 2841, 10.181.252.29): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)

興味深いのは、rdd2 の最初のいくつかの要素を取得し、それから新しい rdd を作成して ES に書き込むと、これが機能することです。問題なく機能します。

x = sc.parallelize([res2.take(1)])
x.saveAsNewAPIHadoopFile(
path='-', 
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable", 
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
conf=es_write_conf)

Elastic Cloud (Elastic Search のクラウド サービス) と Databricks (Apache Spark のクラウド サービス) を使用しています。Spark の ES への書き込みのスループットに ES が追いついていないのでしょうか? Elastic Cloud のサイズを 2GB RAM から 8GB RAM に増やしました。

上記で使用した推奨構成はありes_write_confますか? 他confsに考えられることはありますか?ES 5.0 への更新は役に立ちますか?

どんな助けでも大歓迎です。ここ数日間、これに苦労しています。ありがとうございました。

4

1 に答える 1

2

必ずしもelasticsearchの保存プロセスではなく、pysparkの計算に問題があるようです。次の方法で RDD に問題がないことを確認します。

  1. rdd1count()での実行 (結果を「具体化」するため)
  2. rdd2count()での実行

カウントに問題がない場合は、ES に保存する前に結果をキャッシュしてみてください。

res2.cache()
res2.count() # to fill the cache
res2.saveAsNewAPIHadoopFile(...

それでも問題が発生する場合は、デッド エグゼキュータの stderr と stdout を調べてみてください (SparkUI の [Executors] タブで見つけることができます)。

また、 のバッチ サイズが非常に小さいことに気付きましたes_write_conf。パフォーマンスを向上させるには、バッチ サイズを 500 または 1000 に増やしてみてください。

于 2016-11-11T20:34:24.403 に答える