4

各パーティションの要素がすべて単一の大きなオブジェクトへのアクセスを共有するスパークRDDを永続化しようとしています。ただし、このオブジェクトはメモリに数回格納されるようです。私の問題を、200 要素しかない単一のパーティションのおもちゃのケースにまで減らします。

val nElements = 200
class Elem(val s:Array[Int])

val rdd = sc.parallelize(Seq(1)).mapPartitions( _ => {
    val sharedArray = Array.ofDim[Int](10000000) // Should require ~40MB
    (1 to nElements).toIterator.map(i => new Elem(sharedArray))
}).cache()

rdd.count() //force computation    

ログに示されているように、これは予想される量のメモリを消費します。

storage.MemoryStore: ブロック rdd_1_0 が値としてメモリに保存されます (推定サイズ 38.2 MB、空き容量 5.7 GB)

ただし、200 は、これが該当する要素の最大数です。nElements=201利回りの設定:

storage.MemoryStore: ブロック rdd_1_0 が値としてメモリに保存されます (推定サイズ 76.7 MB、空き容量 5.7 GB)

これは何が原因ですか?このマジック ナンバー 200 はどこから来て、どうすれば増やすことができますか?


明確化のための編集

関数に println を追加すると、関数が実際に 1 回だけ呼び出されることがわかります。さらに、次を実行します。

rdd.map(_.s.hashCode).min == rdd.map(_.s.hashCode).max  // returns true

..10000000 個の要素すべてが実際に同じオブジェクトを指していることを明らかにしているため、データ構造は本質的に正しく動作します。問題が発生するのは、nExamples がはるかに大きい (たとえば 20000) 場合であり、永続化できません。

storage.MemoryStore: rdd_1_0 をメモリにキャッシュするのに十分なスペースがありません! (これまでに計算された 6.1 GB)

設定すると、推定サイズ 1907.4 MBnExamples=500を示すメモリ内の rdd が正常に保持されますが、メモリ使用量の実際の増加はこれよりもはるかに少ないことがわかります。

4

1 に答える 1

0

将来これに出くわす人のために、私は最終的に非常にハックな解決策を思いつきました(ただし、より良い解決策を聞いてうれしいです)。rdd.cache() を使用する代わりに、次のように定義します。

def cached[T: ClassTag](rdd:RDD[T]) = {
    rdd.mapPartitions(p => 
        Iterator(p.toSeq)
    ).cache().mapPartitions(p =>
        p.next().toIterator
    )
}

cached(rdd)「キャッシュされた」リストから生成されたRDDを返すように

于 2014-12-09T23:55:25.383 に答える