各パーティションの要素がすべて単一の大きなオブジェクトへのアクセスを共有するスパークRDDを永続化しようとしています。ただし、このオブジェクトはメモリに数回格納されるようです。私の問題を、200 要素しかない単一のパーティションのおもちゃのケースにまで減らします。
val nElements = 200
class Elem(val s:Array[Int])
val rdd = sc.parallelize(Seq(1)).mapPartitions( _ => {
val sharedArray = Array.ofDim[Int](10000000) // Should require ~40MB
(1 to nElements).toIterator.map(i => new Elem(sharedArray))
}).cache()
rdd.count() //force computation
ログに示されているように、これは予想される量のメモリを消費します。
storage.MemoryStore: ブロック rdd_1_0 が値としてメモリに保存されます (推定サイズ 38.2 MB、空き容量 5.7 GB)
ただし、200 は、これが該当する要素の最大数です。nElements=201
利回りの設定:
storage.MemoryStore: ブロック rdd_1_0 が値としてメモリに保存されます (推定サイズ 76.7 MB、空き容量 5.7 GB)
これは何が原因ですか?このマジック ナンバー 200 はどこから来て、どうすれば増やすことができますか?
明確化のための編集:
関数に println を追加すると、関数が実際に 1 回だけ呼び出されることがわかります。さらに、次を実行します。
rdd.map(_.s.hashCode).min == rdd.map(_.s.hashCode).max // returns true
..10000000 個の要素すべてが実際に同じオブジェクトを指していることを明らかにしているため、データ構造は本質的に正しく動作します。問題が発生するのは、nExamples がはるかに大きい (たとえば 20000) 場合であり、永続化できません。
storage.MemoryStore: rdd_1_0 をメモリにキャッシュするのに十分なスペースがありません! (これまでに計算された 6.1 GB)
設定すると、推定サイズ 1907.4 MBnExamples=500
を示すメモリ内の rdd が正常に保持されますが、メモリ使用量の実際の増加はこれよりもはるかに少ないことがわかります。