1

私は、Spark 2.0.2 の真新しい (そして「アルファ」とタグ付けされた) 構造化ストリーミングを使用して、kafka トピックからメッセージを読み取り、そこからいくつかの cassandra テーブルを更新しています。

val readStream = sparkSession.readStream
  .format("kafka")
  .option("subscribe", "maxwell")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .load
  .as[KafkaMessage]
  .map(<transform KafkaMessage to Company>)

val writeStream = readStream
  .writeStream
  .queryName("CompanyUpdatesInCassandra")
  .foreach(new ForeachWriter[Company] {
    def open(partitionId: Long, version: Long): Boolean = {
      true
    }

    def process(company: Company): Unit = {
      ...
    }

    def close(errorOrNull: Throwable): Unit = {}
  }
  .start
  .awaitTermination

また、sparkSession でチェックポイントの場所 ("spark.sql.streaming.checkpointLocation") を構成しました。これにより、ストリーミング アプリがダウンしている間に届いたメッセージを、再開するとすぐに受け取ることができます。

ただし、このチェックポイントの場所を構成して以来、再開時に、以前のバッチの最後のメッセージが失敗することなく正しく処理されていても、一貫して処理されることに気付きました。

ここで何が間違っているのか分かりますか?これは非常に一般的な使用例のようです。

より詳しい情報:

関連するログを参照してください (トピック 5876 は、前のバッチで正常に処理された最後のトピックです)。

[INFO] 12:44:02.294 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming streaming query, starting with batch 31
[DEBUG] 12:44:02.297 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Found possibly uncommitted offsets {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]}
[DEBUG] 12:44:02.300 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming with committed offsets: {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]}
[DEBUG] 12:44:02.301 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Stream running from {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]} to {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]}
[INFO] 12:44:02.310 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch called with start = Some([(maxwell-0,5876)]), end = [(maxwell-0,5877)]
[INFO] 12:44:02.311 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Partitions added: Map()
[DEBUG] 12:44:02.313 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: TopicPartitions: maxwell-0
[DEBUG] 12:44:02.318 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Sorted executors: 
[INFO] 12:44:02.415 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None)
[DEBUG] 12:44:02.467 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Retrieving data from KafkaSource[Subscribe[maxwell]]: Some([(maxwell-0,5876)]) -> [(maxwell-0,5877)]
[DEBUG] 12:44:09.242 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Creating iterator for KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None)
[INFO] 12:44:09.879 [Executor task launch worker-0] biz.meetmatch.streaming.CompanyUpdateListener$$anon$1: open (partitionId:0, version:31)
[DEBUG] 12:44:09.880 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Get spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 nextOffset -2 requested 5876
[INFO] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Initial fetch for maxwell-0 5876
[DEBUG] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Seeking to spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 5876
[DEBUG] 12:44:10.049 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Polled spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor [maxwell-0]  1

また、ストリームを強制終了するときは、データの損失を避けるために適切に停止するようにします。

sys.ShutdownHookThread
{
  writeStream.stop
  sparkSession.stop
}
4

1 に答える 1

3

現在、構造化ストリーミングは、新しいオフセットが生成されたときに状態をチェックポイントします。あなたが説明したケースが予想されるので、最後にコミットされたバッチは回復後に再処理される可能性があります。ただし、これは内部実装です。バッチをコミットするときにチェックポイントを実行すると、チェックポイントが失敗する可能性があり、シンクの ForeachWriter もこのケースを処理する必要があるとします。

一般に、シンクは常に冪等でなければなりません。

更新: Spark 2.2.0 では、構造化ストリーミングは、成功した場合、回復後にバッチを再実行しません。

于 2016-12-26T00:34:49.700 に答える