Spark RDD にアクセスする際のクロージャーでのローカル変数の使用について質問があります。解決したい問題は次のようになります。
RDD に読み込む必要があるテキストファイルのリストがあります。ただし、まず、単一のテキスト ファイルから作成された RDD に追加情報を追加する必要があります。この追加情報は、ファイル名から抽出されます。次に、union() を使用して、RDD を 1 つの大きな RDD に入れます。
from pyspark import SparkConf, SparkContext
spark_conf = SparkConf().setAppName("SparkTest")
spark_context = SparkContext(conf=spark_conf)
list_of_filenames = ['file_from_Ernie.txt', 'file_from_Bert.txt']
rdd_list = []
for filename in list_of_filenames:
tmp_rdd = spark_context.textFile(filename)
# extract_file_info('file_from_Owner.txt') == 'Owner'
file_owner = extract_file_info(filename)
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
rdd_list.append(tmp_rdd)
overall_content_rdd = spark_context.union(rdd_list)
# ...do something...
overall_content_rdd.collect()
# However, this does not work:
# The result is that always Bert will be the owner, i.e., never Ernie.
問題は、ループ内の map() 関数が「正しい」file_owner を参照していないことです。代わりに、file_owner の最新の値を参照します。私のローカル マシンでは、単一の RDD ごとに cache() 関数を呼び出すことで問題を解決できました。
# ..
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
tmp_rdd.cache()
# ..
私の質問: cache() を使用することは、私の問題に対する正しい解決策ですか? 代替手段はありますか?
どうもありがとう!