0

Zipkin Dependencies Spark ジョブを最適化して、実行するステップ数を最小限に抑えることで、より少ないステージで実行しようとしreduceByKeyています。データは次の表から読み取られます。

CREATE TABLE IF NOT EXISTS zipkin.traces (
    trace_id  bigint,
    ts        timestamp,
    span_name text,
    span      blob,
    PRIMARY KEY (trace_id, ts, span_name)
)

そこでは、単一のパーティションtrace_idに完全なトレースが含まれ、数行から数百行が含まれます。ただし、そのパーティション全体が Spark ジョブによって非常に単純な に変換され、RDD[((String, String), Long)]エントリの数が数十億からわずか数百に減少します。

残念ながら、現在のコードは、すべての行を個別に読み取ることでそれを行っています。

sc.cassandraTable(keyspace, "traces")

reduceByKeyを考え出すために2 つのステップを使用しますRDD[((String, String), Long)]。1 つの Spark ワーカー プロセスでパーティション全体を一度に読み取り、すべてをメモリ内で処理する方法があれば、速度が大幅に向上し、現在のサーバーから出てくる膨大なデータ セットを保存/ストリーミングする必要がなくなります。最初の段階。

- 編集 -

明確にするために、ジョブはテーブルから数十億のパーティションからすべてのデータを読み取る必要があります。

4

1 に答える 1

1

シャッフルを行わずにすべてのパーティション データを同じ Spark ワーカーに保持するための鍵は、次を使用することです。spanByKey

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#grouping-rows-by-partition-key

CREATE TABLE events (year int, month int, ts timestamp, data varchar, PRIMARY KEY (year,month,ts));

sc.cassandraTable("test", "events")
  .spanBy(row => (row.getInt("year"), row.getInt("month")))

sc.cassandraTable("test", "events")
  .keyBy(row => (row.getInt("year"), row.getInt("month")))
  .spanByKey

シャッフルがない場合は、すべての変更がその場で行われ、反復子として一緒にパイプライン化されます。

次の注意事項に注意してください。

注: これは、順番に並べられたデータに対してのみ機能します。データはクラスタリング キーによって Cassandra で順序付けられるため、実行可能なすべてのスパンは自然なクラスタリング キーの順序に従う必要があります。

于 2016-03-31T00:38:54.440 に答える