6

Spark RDD にアクセスする際のクロージャーでのローカル変数の使用について質問があります。解決したい問題は次のようになります。

RDD に読み込む必要があるテキストファイルのリストがあります。ただし、まず、単一のテキスト ファイルから作成された RDD に追加情報を追加する必要があります。この追加情報は、ファイル名から抽出されます。次に、union() を使用して、RDD を 1 つの大きな RDD に入れます。

from pyspark import SparkConf, SparkContext
spark_conf = SparkConf().setAppName("SparkTest")
spark_context = SparkContext(conf=spark_conf)

list_of_filenames = ['file_from_Ernie.txt', 'file_from_Bert.txt']
rdd_list = []
for filename in list_of_filenames:
    tmp_rdd = spark_context.textFile(filename)
    # extract_file_info('file_from_Owner.txt') == 'Owner'
    file_owner = extract_file_info(filename)   
    tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
    rdd_list.append(tmp_rdd)
overall_content_rdd = spark_context.union(rdd_list)
# ...do something...
overall_content_rdd.collect()
# However, this does not work: 
# The result is that always Bert will be the owner, i.e., never Ernie.

問題は、ループ内の map() 関数が「正しい」file_owner を参照していないことです。代わりに、file_owner の最新の値を参照します。私のローカル マシンでは、単一の RDD ごとに cache() 関数を呼び出すことで問題を解決できました。

# ..
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
tmp_rdd.cache()
# ..

私の質問: cache() を使用することは、私の問題に対する正しい解決策ですか? 代替手段はありますか?

どうもありがとう!

4

4 に答える 4

1

したがって、実行している cache() メソッドは必ずしも 100% の時間で機能するとは限りません。ノードに障害がなく、パーティションを再計算する必要がない場合は機能します。簡単な解決策は、file_owner の値を「キャプチャ」する関数を作成することです。これは、潜在的な解決策の pyspark シェルの簡単な図です。

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.2.0-SNAPSHOT
      /_/

Using Python version 2.7.6 (default, Mar 22 2014 22:59:56)
SparkContext available as sc.
>>> hi = "hi"
>>> sc.parallelize(["panda"])
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:365
>>> r = sc.parallelize(["panda"])
>>> meeps = r.map(lambda x : x + hi)
>>> hi = "by"
>>> meeps.collect()
['pandaby']
>>> hi = "hi"
>>> def makeGreetFunction(param):
...     return (lambda x: x + param)
... 
>>> f = makeGreetFunction(hi)
>>> hi="by"
>>> meeps = r.map(f)
>>> meeps.collect()
['pandahi']
>>> 
于 2015-01-29T00:15:03.507 に答える
0

ファイル所有者の配列を作成し、それをマップ変換で使用できます。

file_owner[i] = extract_file_info(filename)
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner[i]))
于 2015-01-29T08:00:26.633 に答える
0

他の人が説明したように、ラムダ関数の問題はfile_owner、実行時に評価されることです。for ループの反復中にその評価を強制するには、構築関数を作成して実行する必要があります。ラムダでそれを行う方法は次のとおりです。

# ...
file_owner = extract_file_info(filename)   
tmp_rdd = tmp_rdd.map((lambda owner: lambda line: (line,owner))(file_owner))
# ...
于 2015-02-04T18:07:42.547 に答える