0

Python 3を使用してPySpark内でelasticsearch-pyクライアントを使用していますが、ESでanalyze()関数をRDDと組み合わせて使用​​すると問題が発生します。特に、RDD の各レコードはテキストの文字列であり、それを分析してトークン情報を取得しようとしていますが、Spark のマップ関数内で使用しようとするとエラーが発生します。

たとえば、これは完全に正常に機能します。

from elasticsearch import Elasticsearch
es = Elasticsearch()
t = 'the quick brown fox'
es.indices.analyze(text=t)['tokens'][0]

{'end_offset': 3,
'position': 1,
'start_offset': 0,
'token': 'the',
'type': '<ALPHANUM>'}

ただし、これを試すと:

trdd = sc.parallelize(['the quick brown fox'])
trdd.map(lambda x: es.indices.analyze(text=x)['tokens'][0]).collect()

ピクルス化に関連する非常に長いエラーメッセージが表示されます(これで終わりです):

(self, obj)    109if'recursion'in.[0]:    110="""Could not pickle object as excessively deep recursion required."""--> 111                  picklePicklingErrormsg

  save_memoryviewself obj

: Could not pickle object as excessively deep recursion required.

raise.()    112    113def(,):PicklingError

エラーの意味がわかりません。私は何か間違ったことをしていますか?ES分析機能をRDDのレコードにマップする方法はありますか?

編集:elasticsearch-py の他の関数 (es.termvector() など) を適用するときにも、この動作が発生します。

4

1 に答える 1

1

基本的に、Elasticsearchクライアントはシリアライズ可能ではありません。したがって、パーティションごとにクライアントのインスタンスを作成し、それらを処理する必要があります。

def get_tokens(part): es = Elasticsearch() yield [es.indices.analyze(text=x)['tokens'][0] for x in part] rdd = sc.parallelize([['the quick brown fox'], ['brown quick dog']], numSlices=2) rdd.mapPartitions(lambda p: get_tokens(p)).collect()

次の結果が得られるはずです。 Out[17]: [[{u'end_offset': 3, u'position': 1, u'start_offset': 0, u'token': u'the', u'type': u'<ALPHANUM>'}], [{u'end_offset': 5, u'position': 1, u'start_offset': 0, u'token': u'brown', u'type': u'<ALPHANUM>'}]]

大規模なデータ セットの場合、データセット内の各要素に対して ES への REST 呼び出しが含まれるため、これは非常に非効率的であることに注意してください。

于 2015-08-24T10:38:37.150 に答える