4

私はスパークとカサンドラの初心者です。以下のように、spark-cassandra コネクタを使用して cassandra テーブルに挿入しようとしています。

import java.util.UUID

import org.apache.spark.{SparkContext, SparkConf}
import org.joda.time.DateTime
import com.datastax.spark.connector._

case class TestEntity(id:UUID, category:String, name:String,value:Double, createDate:DateTime, tag:Long)

object SparkConnectorContext {
  val conf = new SparkConf(true).setMaster("local")
    .set("spark.cassandra.connection.host", "192.168.xxx.xxx")
  val sc = new SparkContext(conf)
}
object TestRepo {
  def insertList(list: List[TestEntity]) = {
    SparkConnectorContext.sc.parallelize(list).saveToCassandra("testKeySpace", "testColumnFamily")
  }
}
object TestApp extends App {
  val start = System.currentTimeMillis()
  TestRepo.insertList(Utility.generateRandomData())
  val end = System.currentTimeMillis()
  val timeDiff = end-start
  println("Difference (in millis)= "+timeDiff)
}

上記の方法(100個のエンティティを含むリスト)を使用して挿入すると、300-1100 milliseconds. ファントムライブラリを使用して同じデータを挿入しようとしました。未満しかかかっていません20-40 milliseconds

スパークコネクタの挿入にこれほど時間がかかる理由を誰か教えてもらえますか? コードで何か間違ったことをしていますか、それとも挿入操作にspark-cassandra コネクタを使用することはお勧めできませんか?

4

2 に答える 2

2

「ベンチマーク」にはいくつかの深刻な問題があります。

  1. データ セットが非常に小さいため、ほとんどの場合、ジョブのセットアップ時間のみを測定しています。100 個のエンティティを保存するには、1 つのノードで数秒ではなく、1 ミリ秒のオーダーにする必要があります。また、100 個のエンティティを保存すると、JVM は実行するコードを最適化されたマシン コードにコンパイルする機会がなくなります。
  2. 測定にスパーク コンテキストの初期化を含めました。JVM はクラスを遅延ロードするため、スパーク初期化のコードは、測定が開始された後に実際に呼び出されます。これは非常にコストのかかる要素であり、通常、ジョブごとではなく、spark アプリケーション全体で 1 回だけ実行されます。
  3. 起動ごとに 1 回だけ測定を実行しています。これは、JVM が初めてすべてのクラスをロードする必要があり、Hotspot が起動する可能性がないため、spark ctx のセットアップとジョブのセットアップ時間を誤って測定していることを意味します。

要約すると、ほとんどの場合、ロードされたクラスのサイズと数に依存するクラスのロード時間を測定している可能性が非常に高くなります。Spark はロードするのに非常に大きなものであり、数百ミリ秒であってもまったく驚くべきことではありません。

挿入のパフォーマンスを正しく測定するには:

  • より大きなデータセットを使用する
  • 測定から 1 回限りのセットアップを除外する
  • 安定した状態のパフォーマンスに達するまで、同じスパーク コンテキストを共有する複数の実行を行い、最初のいくつかの実行を破棄します。

ところで、デバッグ ログ レベルを有効にすると、コネクタはすべてのパーティションの挿入時間をエグゼキュータ ログに記録します。

于 2015-08-12T18:24:53.277 に答える