0

内部で mapwithstate 関数を使用するストリーミング アプリケーションを開発しています...

checkpoinitnd データのチェックポイント間隔を手動で設定する必要があります。

これは私のサンプルコードです..

var newContextCreated = false      // Flag to detect whether new context  was created or not

// Function to create a new StreamingContext and set it up
def creatingFunc(): StreamingContext = {

  // Create a StreamingContext
val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))

  // Create a stream that generates 1000 lines per second
  val stream = ssc.receiverStream(new DummySource(eventsPerSecond))

  // Split the lines into words, and create a paired (key-value) dstream
  val wordStream = stream.flatMap { _.split(" ")  }.map(word => (word, 1))

   // This represents the emitted stream from the trackStateFunc. Since we    emit every input record with the updated value,
  // this stream will contain the same # of records as the input dstream.
  val wordCountStateStream = wordStream.mapWithState(stateSpec)
 wordCountStateStream.print()

  // A snapshot of the state for the current batch. This dstream contains one entry per key.
  val stateSnapshotStream = wordCountStateStream.stateSnapshots()  
 stateSnapshotStream.foreachRDD { rdd =>
rdd.toDF("word", "count").registerTempTable("batch_word_count")
}

   ssc.remember(Minutes(1))  // To make sure data is not deleted by the time     we query it interactively

 ssc.checkpoint("dbfs:/streaming/trackstate/100")

 println("Creating function called to create new StreamingContext")
     newContextCreated = true  
     ssc 
   }
4

0 に答える 0