1

DStream[String, Int単語数のペアを持つ] があり("hello" -> 10)ます。これらのカウントをステップ インデックスを使用して cassandra に書き込みたいと考えています。インデックスは次のように初期化されvar step = 1、マイクロバッチが処理されるたびに増分されます。

次のように作成された cassandra テーブル:

CREATE TABLE wordcounts (
    step int,
    word text,
    count int,
primary key (step, word)
);

ストリームをテーブルに書き込もうとすると...

stream.saveToCassandra("keyspace", "wordcounts", SomeColumns("word", "count"))

...わかりjava.lang.IllegalArgumentException: Some primary key columns are missing in RDD or have not been selected: stepました。

step3 つの列をまとめて書き込むために、ストリームの先頭にインデックスを追加するにはどうすればよいですか?

私はspark 2.0.0、scala 2.11.8、cassandra 3.4.0、およびspark-cassandra-connector 2.0.0-M3を使用しています。

4

3 に答える 3

1

前述のように、Cassandra テーブルは form の何かを想定していますが(Int, String, Int)、wordCount DStream は typeDStream[(String, Int)]であるため、 への呼び出しが機能するには、 of typesaveToCassandra(...)が必要です。DStreamDStream[(Int, String, Int)]

この質問で難しいのは、定義上、ドライバーでのみ認識されるローカル カウンターを DStream のレベルまで引き上げる方法です。

そのためには、カウンターを分散レベル (Spark では "RDD" または "DataFrame" を意味します) に "リフト" し、その値を既存のDStreamデータと結合する必要があります。

従来のストリーミング ワード カウントの例とは異なります。

// Split each line into words
val words = lines.flatMap(_.split(" "))

// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

マイクロバッチの数を保持するローカル変数を追加します。

@transient var batchCount = 0

これは一時的であると宣言されているため、Spark は、それを使用する変換を宣言するときにその値を閉じようとしません。

ここで注意が必要です: DStream ation のコンテキスト内で、transformその単一varの iable から RDD を作成し、デカルト積を使用して DStream の基になる RDD と結合します。

val batchWordCounts = wordCounts.transform{ rdd => 
  batchCount = batchCount + 1

  val localCount = sparkContext.parallelize(Seq(batchCount))
  rdd.cartesian(localCount).map{case ((word, count), batch) => (batch, word, count)}
}

map(iableの初期値のみがキャプチャされてシリアル化されるため、単純な関数は機能しないことに注意してくださいvar。したがって、DStream データを見ると、カウンターが増加していないように見えます。

最後に、データが適切な形になったので、Cassandra に保存します。

batchWordCounts.saveToCassandra("keyspace", "wordcounts")
于 2016-11-03T09:30:27.863 に答える
-1

RDD を既存の Cassandra テーブルに保存しようとしているため、すべての主キー列の値を RDD に含める必要があります。

できることは、以下の方法を使用してRDDを新しいテーブルに保存できることです。

saveAsCassandraTable or saveAsCassandraTableEx

詳細については、これを参照してください。

于 2016-11-03T04:17:10.537 に答える