9

次のようなコードを実行すると:

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
newRDD.checkpoint
print(newRDD.count())

Yarn のステージを見てみると、Spark が DAG 計算を 2 回実行していることに気付きました。1 回目は RDD を具体化してそれをキャッシュする個別 + カウントで、次に完全に 2 回目でチェックポイントされたコピーを作成しました。

RDD は既に実体化され、キャッシュされているのに、なぜチェックポイントは単純にこれを利用して、キャッシュされたパーティションをディスクに保存しないのでしょうか?

Sparkにこれを利用させ、操作を1回だけ実行させ、チェックポイントで物事をコピーする既存の方法(何らかの構成設定またはコード変更)はありますか?

代わりに、2 回「実体化」する必要がありますか?

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(newRDD.count())

newRDD.checkpoint
print(newRDD.count())

これを機能リクエストにするために、Apache Spark Jira チケットを作成しました: https://issues.apache.org/jira/browse/SPARK-8666

4

3 に答える 3

7

これは既知の問題のようです。古い JIRA チケット ( https://issues.apache.org/jira/browse/SPARK-8582 ) を参照してください。

于 2015-06-26T17:01:48.187 に答える