5

Spark SQL + Cassandra を使い始めたばかりで、おそらく何か重要なものが欠けていますが、1 つの単純なクエリには約 45 秒かかります。私はcassanda-spark-connectorライブラリを使用しており、Spark もホストするローカル Web サーバーを実行しています。したがって、私のセットアップはおおよそ次のようになります。

sbt で:

    "org.apache.spark" %% "spark-core" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")),
    "org.apache.spark" %% "spark-sql" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")),
    "com.datastax.spark" %% "spark-cassandra-connector" % "1.4.0-M3" excludeAll(ExclusionRule(organization = "org.slf4j"))

コードには、ホストするシングルトンがありSparkContextCassandraSQLContetx. その後、サーブレットから呼び出されます。シングルトン コードは次のようになります。

object SparkModel {

  val conf =
    new SparkConf()
      .setAppName("core")
      .setMaster("local")
      .set("spark.cassandra.connection.host", "127.0.0.1")

  val sc = new SparkContext(conf)
  val sqlC = new CassandraSQLContext(sc)
  sqlC.setKeyspace("core")

  val df: DataFrame = sqlC.cassandraSql(
    "SELECT email, target_entity_id, target_entity_type " +
    "FROM tracking_events " +
    "LEFT JOIN customers " +
    "WHERE entity_type = 'User' AND entity_id = customer_id")
}

そして、ここで私がそれをどのように使用するか:

get("/spark") {
  SparkModel.df.collect().map(r => TrackingEvent(r.getString(0), r.getString(1), r.getString(2))).toList
}

Cassandra、Spark、および Web アプリは、まともな仕様の Macbook Pro の仮想マシンの同じホストで実行されます。Cassandra クエリ自体には 10 ~ 20 ミリ秒かかります。

このエンドポイントを初めて呼び出すと、結果が返されるまでに 70 ~ 80 秒かかります。後続のクエリには最大 45 秒かかります。その後の操作のログは次のようになります。

12:48:50 INFO  org.apache.spark.SparkContext - Starting job: collect at V1Servlet.scala:1146
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Got job 1 (collect at V1Servlet.scala:1146) with 1 output partitions (allowLocal=false)
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Final stage: ResultStage 1(collect at V1Servlet.scala:1146)
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Parents of final stage: List()
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Missing parents: List()
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Submitting ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146), which has no missing parents
12:48:50 INFO  org.apache.spark.storage.MemoryStore - ensureFreeSpace(18696) called with curMem=26661, maxMem=825564856
12:48:50 INFO  org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values in memory (estimated size 18.3 KB, free 787.3 MB)
12:48:50 INFO  org.apache.spark.storage.MemoryStore - ensureFreeSpace(8345) called with curMem=45357, maxMem=825564856
12:48:50 INFO  org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.1 KB, free 787.3 MB)
12:48:50 INFO  o.a.spark.storage.BlockManagerInfo - Added broadcast_1_piece0 in memory on localhost:56289 (size: 8.1 KB, free: 787.3 MB)
12:48:50 INFO  org.apache.spark.SparkContext - Created broadcast 1 from broadcast at DAGScheduler.scala:874
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146)
12:48:50 INFO  o.a.s.scheduler.TaskSchedulerImpl - Adding task set 1.0 with 1 tasks
12:48:50 INFO  o.a.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 1.0 (TID 1, localhost, NODE_LOCAL, 59413 bytes)
12:48:50 INFO  org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 (TID 1)
12:48:50 INFO  com.datastax.driver.core.Cluster - New Cassandra host localhost/127.0.0.1:9042 added
12:48:50 INFO  c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster
12:49:11 INFO  o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB)
12:49:35 INFO  org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver
12:49:35 INFO  o.a.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 1.0 (TID 1) in 45199 ms on localhost (1/1)
12:49:35 INFO  o.a.s.scheduler.TaskSchedulerImpl - Removed TaskSet 1.0, whose tasks have all completed, from pool 
12:49:35 INFO  o.a.spark.scheduler.DAGScheduler - ResultStage 1 (collect at V1Servlet.scala:1146) finished in 45.199 s

ログからわかるように、最長の一時停止は次の 3 行 (21 + 24 秒) の間です。

12:48:50 INFO  c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster
12:49:11 INFO  o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB)
12:49:35 INFO  org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver

どうやら、私は何か間違ったことをしている。あれは何でしょう?どうすればこれを改善できますか?

編集:重要な追加: テーブルのサイズは小さい ( の場合は ~200 エントリ、 のtracking_events場合は ~20 customers) ため、それら全体をメモリに読み込むのにそれほど時間はかかりません。そして、これはローカルの Cassandra インストールであり、クラスターもネットワークも関係ありません。

4

1 に答える 1

4
  "SELECT email, target_entity_id, target_entity_type " +
    "FROM tracking_events " +
    "LEFT JOIN customers " +
    "WHERE entity_type = 'User' AND entity_id = customer_id")

このクエリは、tracking_events テーブルと customers テーブルの両方からすべてのデータを読み取ります。パフォーマンスを、両方のテーブルで SELECT COUNT(*) を実行する場合と比較します。大幅に異なる場合は問題がある可能性がありますが、これは両方のテーブルを完全にメモリに読み込むのにかかる時間にすぎないと思います。

読み取りがどのように行われるかを調整するためのいくつかのノブがあり、デフォルトははるかに大きなデータセットに向けられているため、これらを変更することができます.

spark.cassandra.input.split.size_in_mb  approx amount of data to be fetched into a Spark partition  64 MB
spark.cassandra.input.fetch.size_in_rows    number of CQL rows fetched per driver request   1000

すべてのリソースを活用できるように、(少なくとも) コアと同じ数のタスクを生成していることを確認します。これを行うには、input.split.size を縮小します

フェッチ サイズは、executor コアによって一度にページングされる行数を制御するため、これを増やすと、一部のユース ケースでは速度が向上します。

于 2015-08-17T16:54:18.597 に答える