1

私の以前の質問に基づいて、Spark と Python は RDD の入力としてカスタム ファイル形式/ジェネレーターを使用します。基本的に sc.textFile() で入力を解析し、自分のまたはいくつかのライブラリ カスタム関数を使用して解析できるはずです。

今、特に gensim フレームワークを使用してウィキペディアのダンプを解析しようとしています。マスター ノードとすべてのワーカー ノードに gensim を既にインストールしており、この質問List (or iterator) of tuples returned by MAP (PySpark) に触発されたウィキペディア ページを解析するために、gensim ビルドイン関数を使用したいと考えています。

私のコードは次のとおりです。

import sys
import gensim
from pyspark import SparkContext


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print >> sys.stderr, "Usage: wordcount <file>"
        exit(-1)

    sc = SparkContext(appName="Process wiki - distributed RDD")

    distData = sc.textFile(sys.argv[1])
    #take 10 only to see how the output would look like
    processed_data = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)

    print processed_data
    sc.stop()

extract_pages のソース コードはhttps://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.pyで見つけることができ、私の経験に基づいて、Spark で動作するはずです。

残念ながら、コードを実行すると、次のエラー ログが表示されます。

14/10/05 13:21:11 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, <ip address>.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/root/spark/python/pyspark/worker.py", line 79, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/root/spark/python/pyspark/serializers.py", line 127, in dump_stream
for obj in iterator:
File "/root/spark/python/pyspark/serializers.py", line 185, in _batched
for item in iterator:
File "/root/spark/python/pyspark/rdd.py", line 1148, in takeUpToNumLeft
yield next(iterator)
File "/usr/lib64/python2.6/site-packages/gensim/corpora/wikicorpus.py", line 190, in extract_pages
elems = (elem for _, elem in iterparse(f, events=("end",)))
File "<string>", line 52, in __init__
IOError: [Errno 2] No such file or directory: u'<mediawiki xmlns="http://www.mediawiki.org/xml/export-0.9/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mediawiki.org/xml/export-0.9/ http://www.mediawiki.org/xml/export-0.9.xsd" version="0.9" xml:lang="en">'
    org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
    org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
    org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    org.apache.spark.scheduler.Task.run(Task.scala:54)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    java.lang.Thread.run(Thread.java:745)

そして、おそらく Spark ログ:

14/10/05 13:21:12 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
14/10/05 13:21:12 INFO scheduler.DAGScheduler: Failed to run runJob at PythonRDD.scala:296

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Spark なしでこれをうまく試したので、問題は Spark と gensim の組み合わせのどこかにあるはずですが、私が得ているエラーはよくわかりません。gensim wikicorpus.py の 190 行目にファイルの読み取りが見られません。

編集:

Spark からいくつかのログを追加しました。

EDIT2:

gensim は from xml.etree.cElementTree import iterparse, documentation hereを使用しており、これが問題を引き起こす可能性があります。実際には、xml データを含むファイル名またはファイルが必要です。RDD は xml データを含むファイルと見なすことができますか?

4

1 に答える 1

1

私は通常、Scala で Spark を使用します。それにもかかわらず、ここに私の考えがあります:

sc.textFile を介してファイルをロードすると、sparkWorker 全体に分散されるある種の行反復子になります。ウィキペディアの xml 形式を考えると、1 行が解析可能な xml アイテムに必ずしも対応していないため、この問題が発生しています。

すなわち:

 Line 1 :  <item>
 Line 2 :  <title> blabla </title> <subitem>
 Line 3 : </subItem>
 Line 4 : </item>

各行を単独で解析しようとすると、取得したような例外が吐き出されます。

通常、ウィキペディアのダンプをいじる必要があるため、最初に行うことは、Spark で簡単に消化できる "読み取り可能なバージョン" に変換することです。例: 記事のエントリごとに 1 行。このようになったら、簡単に Spark にフィードして、あらゆる種類の処理を行うことができます。変換するのに多くのリソースは必要ありません

ReadableWiki をご覧ください: https://github.com/idio/wiki2vec

于 2015-02-19T13:09:57.250 に答える