次のようなコードを実行すると:
val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
newRDD.checkpoint
print(newRDD.count())
Yarn のステージを見てみると、Spark が DAG 計算を 2 回実行していることに気付きました。1 回目は RDD を具体化してそれをキャッシュする個別 + カウントで、次に完全に 2 回目でチェックポイントされたコピーを作成しました。
RDD は既に実体化され、キャッシュされているのに、なぜチェックポイントは単純にこれを利用して、キャッシュされたパーティションをディスクに保存しないのでしょうか?
Sparkにこれを利用させ、操作を1回だけ実行させ、チェックポイントで物事をコピーする既存の方法(何らかの構成設定またはコード変更)はありますか?
代わりに、2 回「実体化」する必要がありますか?
val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(newRDD.count())
newRDD.checkpoint
print(newRDD.count())
これを機能リクエストにするために、Apache Spark Jira チケットを作成しました: https://issues.apache.org/jira/browse/SPARK-8666