2

常に 6 つのタスクを実行する 5 ノードの Cassandra クラスターから 500m 行を読み取る単純な Spark ジョブがあり、各タスクのサイズが原因で書き込みの問題が発生しています。input_split_size を調整してみましたが、効果がないようです。現時点では、テーブル スキャンの再分割を余儀なくされていますが、これはコストがかかるため理想的ではありません。

いくつかの投稿を読んだ後、起動スクリプト (以下) で num-executors を増やそうとしましたが、効果はありませんでした。

Cassandra テーブル スキャンでタスクの数を設定する方法がない場合は、それで問題ありません..しかし、ここで何かが欠けているのではないかと常に感じています。

Spark ワーカーは、それぞれに 2 TB SSD を備えた 8 コア、64 GB サーバーである C* ノード上に存在します。

...
val conf = new SparkConf(true).set("spark.cassandra.connection.host",
cassandraHost).setAppName("rowMigration")
  conf.set("spark.shuffle.memoryFraction", "0.4")
  conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  conf.set("spark.executor.memory", "15G")
  conf.set("spark.cassandra.input.split.size_in_mb", "32") //default 64mb
  conf.set("spark.cassandra.output.batch.size.bytes", "1000") //default
  conf.set("spark.cassandra.output.concurrent.writes", "5") //default

val sc = new SparkContext(conf)

val rawEvents = sc.cassandraTable(cassandraKeyspace, eventTable)
  .select("accountid", "userid", "eventname", "eventid", "eventproperties")
  .filter(row=>row.getString("accountid").equals("someAccount"))
  .repartition(100)

val object = rawEvents
  .map(ele => (ele.getString("userid"),
    UUID.randomUUID(),
    UUID.randomUUID(),
    ele.getUUID("eventid"),
    ele.getString("eventname"),
    "event type",
    UUIDs.unixTimestamp(ele.getUUID("eventid")),
    ele.getMap[String, String]("eventproperties"),
    Map[String, String](),
    Map[String, String](),
    Map[String, String]()))
  .map(row=>MyObject(row))

Object.saveToCassandra(targetCassandraKeyspace,eventTable)

起動スクリプト:

#!/bin/bash
export SHADED_JAR="Migrate.jar"
export SPARKHOME="${SPARKHOME:-/opt/spark}"
export SPARK_CLASSPATH="$SHADED_JAR:$SPARK_CLASSPATH"
export CLASS=com.migration.migrate
"${SPARKHOME}/bin/spark-submit" \
        --class "${CLASS}" \
        --jars $SHADED_JAR,$SHADED_JAR \
        --master spark://cas-1-5:7077  \
        --num-executors 15 \
        --executor-memory 20g \
        --executor-cores 4 "$SHADED_JAR" \
        --worker-cores 20 \
        -Dcassandra.connection.host=10.1.20.201 \
        -Dzookeeper.host=10.1.20.211:2181 \

編集 - Piotrの答えに続いて:

sc.cassandraTable の ReadConf.splitCount を次のように設定しましたが、生成されるタスクの数は変わりません。つまり、テーブル スキャンを再分割する必要があります。私はこれについて間違った考えをしていて、再分割が必要であると考え始めています。現在、このジョブには約 1.5 時間かかっています。テーブル スキャンをそれぞれ約 10 MB の 1000 のタスクに再分割すると、書き込み時間が数分に短縮されました。

val cassReadConfig = new ReadConf {
      ReadConf.apply(splitCount = Option(1000)
        )
    }

    val sc = new SparkContext(conf)

    val rawEvents = sc.cassandraTable(cassandraKeyspace, eventTable)
    .withReadConf(readConf = cassReadConfig)
4

2 に答える 2

3

Spark コネクタ 1.3 以降、分割サイズは、Cassandra 2.1.5 以降で利用可能な system.size_estimates Cassandra テーブルに基づいて推定されます。このテーブルは Cassandra によって定期的に更新され、新しいデータのロード/削除または新しいノードへの参加の直後に、その内容が正しくない可能性があります。そこにある見積もりがデータ量を反映しているかどうかを確認してください。これは比較的新しい機能であるため、バグが存在する可能性も十分にあります。

見積もりが間違っている場合、または古い Cassandra を実行している場合、分割サイズの自動調整をオーバーライドする機能が残されています。sc.cassandraTable は、固定数の分割を強制する splitCount を設定できる ReadConf パラメータを受け取ります。

split_size_in_mb パラメーターに関しては、確かにプロジェクト ソースにしばらくの間バグがありましたが、maven に公開されたバージョンにリリースされる前に修正されました。そのため、(古い) ソースからコネクタをコンパイルしない限り、ヒットしないでください。

于 2015-07-29T06:03:20.867 に答える