私の以前の質問に基づいて、Spark と Python は RDD の入力としてカスタム ファイル形式/ジェネレーターを使用します。基本的に sc.textFile() で入力を解析し、自分のまたはいくつかのライブラリ カスタム関数を使用して解析できるはずです。
今、特に gensim フレームワークを使用してウィキペディアのダンプを解析しようとしています。マスター ノードとすべてのワーカー ノードに gensim を既にインストールしており、この質問List (or iterator) of tuples returned by MAP (PySpark) に触発されたウィキペディア ページを解析するために、gensim ビルドイン関数を使用したいと考えています。
私のコードは次のとおりです。
import sys
import gensim
from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) != 2:
print >> sys.stderr, "Usage: wordcount <file>"
exit(-1)
sc = SparkContext(appName="Process wiki - distributed RDD")
distData = sc.textFile(sys.argv[1])
#take 10 only to see how the output would look like
processed_data = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)
print processed_data
sc.stop()
extract_pages のソース コードはhttps://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.pyで見つけることができ、私の経験に基づいて、Spark で動作するはずです。
残念ながら、コードを実行すると、次のエラー ログが表示されます。
14/10/05 13:21:11 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, <ip address>.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/root/spark/python/pyspark/worker.py", line 79, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/root/spark/python/pyspark/serializers.py", line 127, in dump_stream
for obj in iterator:
File "/root/spark/python/pyspark/serializers.py", line 185, in _batched
for item in iterator:
File "/root/spark/python/pyspark/rdd.py", line 1148, in takeUpToNumLeft
yield next(iterator)
File "/usr/lib64/python2.6/site-packages/gensim/corpora/wikicorpus.py", line 190, in extract_pages
elems = (elem for _, elem in iterparse(f, events=("end",)))
File "<string>", line 52, in __init__
IOError: [Errno 2] No such file or directory: u'<mediawiki xmlns="http://www.mediawiki.org/xml/export-0.9/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mediawiki.org/xml/export-0.9/ http://www.mediawiki.org/xml/export-0.9.xsd" version="0.9" xml:lang="en">'
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
そして、おそらく Spark ログ:
14/10/05 13:21:12 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
14/10/05 13:21:12 INFO scheduler.DAGScheduler: Failed to run runJob at PythonRDD.scala:296
と
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Spark なしでこれをうまく試したので、問題は Spark と gensim の組み合わせのどこかにあるはずですが、私が得ているエラーはよくわかりません。gensim wikicorpus.py の 190 行目にファイルの読み取りが見られません。
編集:
Spark からいくつかのログを追加しました。
EDIT2:
gensim は from xml.etree.cElementTree import iterparse
, documentation hereを使用しており、これが問題を引き起こす可能性があります。実際には、xml データを含むファイル名またはファイルが必要です。RDD は xml データを含むファイルと見なすことができますか?