1

データが十分に大きく、一度メモリにキャッシュできない場合、cassandraからsparkにいくつかのデータを取得します.spark.cassandra.input.split.size_in_mbを使用して、マシンが一度に取得できるデータの大きさを設定する必要があります.しかしまた、を使用してより多くの時間データをキャッシュしたい ,コードは次のようになります:

val conf = new SparkConf().setAppName("CassandraLogAnalyse")
  .set("spark.cassandra.connection.host", "xxx")
  .set("spark.cassandra.auth.username", "xxx")
  .set("spark.cassandra.auth.password", "xxx")
  .set("spark.cassandra.input.split.size_in_mb",'512')
//Select Data from cassandra
val sc = new SparkContext(conf)

val loggly_http_in = sc.cassandraTable("loggly", "http_in").select("uid", "cjj_id", "request_uri", "request_body").where("app_context = ? and log_time > ?", "news", batch_time)

loggly_http_in.cache()

val rdd1 = loggly_http_in.map(...).filter(...)......
val rdd2 = loggly_http_in.map(...).filter(...)......

それが正しいか?それが正しい場合、それはどのように機能しますか?それが間違っているとき、正しい方法は何ですか?

4

1 に答える 1

1

spark.cassandra.input.split.size_in_mb設定はキャッシュとは関係ありません。この設定により、各 Spark パーティションの大きさが決まります。設定が大きすぎると、タスクが少なすぎて、一部のノードが未使用のままになる可能性があります。設定が低すぎると、タスク スケジューリングのオーバーヘッドが大きくなります。

Spark は、RDD の複数のパーティション (および複数の RDD) をキャッシュできます。したがって、 を呼び出すとcache()、空きメモリを見つけられる限り多くの RDD パーティションをキャッシュしようとします。実際のキャッシュよりも多くをキャッシュする必要がある場合、唯一の方法は、より多くの Spark クラスター メモリをアプリケーションに割り当てることです。

の使い方cacheが良さそうです。

変換された RDD をキャッシュすることもできることを忘れないでください。たとえば、フィルタリング後に RDD をキャッシュすると、Cassandra から取得した元の RDD をキャッシュするよりもメモリが少なくて済みます。

于 2015-12-07T13:02:52.887 に答える