Python 3を使用してPySpark内でelasticsearch-pyクライアントを使用していますが、ESでanalyze()関数をRDDと組み合わせて使用すると問題が発生します。特に、RDD の各レコードはテキストの文字列であり、それを分析してトークン情報を取得しようとしていますが、Spark のマップ関数内で使用しようとするとエラーが発生します。
たとえば、これは完全に正常に機能します。
from elasticsearch import Elasticsearch
es = Elasticsearch()
t = 'the quick brown fox'
es.indices.analyze(text=t)['tokens'][0]
{'end_offset': 3,
'position': 1,
'start_offset': 0,
'token': 'the',
'type': '<ALPHANUM>'}
ただし、これを試すと:
trdd = sc.parallelize(['the quick brown fox'])
trdd.map(lambda x: es.indices.analyze(text=x)['tokens'][0]).collect()
ピクルス化に関連する非常に長いエラーメッセージが表示されます(これで終わりです):
(self, obj) 109if'recursion'in.[0]: 110="""Could not pickle object as excessively deep recursion required."""--> 111 picklePicklingErrormsg
save_memoryviewself obj
: Could not pickle object as excessively deep recursion required.
raise.() 112 113def(,):PicklingError
エラーの意味がわかりません。私は何か間違ったことをしていますか?ES分析機能をRDDのレコードにマップする方法はありますか?
編集:elasticsearch-py の他の関数 (es.termvector() など) を適用するときにも、この動作が発生します。