1

私は Apache Spark の使用方法を学ぼうとしてきましたが、(datastax spark-cassandra-connector を使用して) Cassandra から列のすべての値を合計しようとすると問題が発生します。私が試したすべての結果はjava.lang.OutOfMemoryError: Java heap spaceになります。

スパークマスターに提出するコードは次のとおりです。

object Benchmark {
  def main( args: Array[ String ] ) {
    val conf    = new SparkConf()
                  .setAppName( "app" )
                  .set( "spark.cassandra.connection.host", "ec2-blah.compute-1.amazonaws.com" )
                  .set( "spark.cassandra.auth.username", "myusername" )
                  .set( "spark.cassandra.auth.password", "mypassword" )
                  .set( "spark.executor.memory", "4g" )
    val sc      = new SparkContext( conf )
    val tbl     = sc.cassandraTable( "mykeyspace", "mytable" )
    val res     = tbl.map(_.getFloat("sclrdata")).sum()

    println( "sum = " + res )
  }
}

現在、クラスターには 1 つの Spark ワーカー ノードしかありません。テーブルのサイズを考えると、一度にすべてをメモリに収めることができない可能性は間違いありません。ただし、spark はコマンドを遅延評価することになっているため、これが問題になるとは思いませんでした。列内のすべての値を合計しても、テーブル全体が一度にメモリに存在する必要はありません。

私はこのトピックの初心者なので、なぜこれがうまくいかないのか、正しく行う方法を教えていただければ幸いです。

ありがとう

4

1 に答える 1

1

おそらく、spark はテーブル全体を単一のインメモリ パーティションとして構築し、マッピング操作を実行できるようにします。

Spark は OutOfMemoryExceptions をスローするのではなくディスクにスピルするはずだと思っていましたが、パーティションが 1 つしかない場合はスピルできない可能性があります。ここで同様の問題を見ましたが、彼は次のように分割サイズを指定して解決しました。

conf = new SparkConf();
        conf.setAppName("Test");
        conf.setMaster("local[4]");
        conf.set("spark.cassandra.connection.host", "192.168.1.15").
        set("spark.executor.memory", "2g").
        set("spark.cassandra.input.split.size_in_mb", "67108864");

conf で spark.cassandra.input.split.size_in_mb を設定してみてください。

これにより、スパークがテーブルのチャンクを合計し、新しいチャンク用のスペースが必要になったときにそれらのチャンクをメモリから追い出すことができると思います。

他に検討できることは、ディスクへのスピルを許可するテーブル RDD のストレージ レベルを指定することです。「.persist(StorageLevel.MEMORY_AND_DISK)」を追加することでこれを行うことができると思います。デフォルトは MEMORY_ONLY のようです。ストレージ レベルの詳細については、こちらの RDD 永続性セクションを参照してください。

于 2015-07-10T13:12:08.017 に答える