2

つまり、Spark ストリーミング コンテキストで「期間」を値に設定する代わりに、(ソケット クローズ時間 - ソケット オープン時間) に設定したい

4

2 に答える 2

4

StreamingListnerインターフェイスを使用して、切断されているレシーバーをリッスンし、ストリーミング コンテキストをシャットダウンできます。

これは次のように使用されます

// define listener
class MyListener extends StreamingListener {
  override def onReceiverStopped(...) {
    streamingContext.stop()
  }
} 

// attach listener
streamingContext. addStreamingListener(new MyListener())
于 2014-06-24T23:49:03.383 に答える
0

ストリーミング リスナーで ssc.stop() を実行すると、ストリーミングがエラーになるか終了したときに、この関数がトリガーされて停止します。バッチの開始時、終了時など、別のトリガーがあります。

    ssc.addStreamingListener(new StreamingListener() {

    @Override
      public void onReceiverError(StreamingListenerReceiverError receiverError) {
      System.out.println("Do what u want");
      ssc.stop();
      }

    @Override
      public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
      System.out.println("Do what u want");
      ssc.stop(true,true); 
      }
    }

    ssc.start();
于 2015-11-16T08:38:56.050 に答える