0

ソースから一定数の要素が流れ込むユースケースがあります。これらの要素が処理され、この場合は ActorSubscriber である Sink があります。すべての要素を送信した後、Source は完了イベントを通知します。このイベントはシンクに伝達され、シンクは onComplete イベントを起動してストリーム ワークフローを閉じます。これはすべて、要素がまだ中間ステップで処理されている間に発生します。

中間ステップが処理されたデータを返したときに処理が完了すると、onComplete イベントがトリガーされたために既に閉じられているため、サブスクライバーは見つかりません。アクター サブスクライバーの onComplete イベントを破棄し、常に開いたままにする方法はありますか。

コード スニペット:

val sample1 = //Sample time series

Source(List(sample1,sample2,sample3))
    .map(m => akka.util.ByteString( m.getBytes ))
    .to(detectionWorkflow(context))
    .run()

 val intakeBuffer = b.add(
    Flow[ByteString]
      .buffer(conf.tcpInboundBufferSize, OverflowStrategy.backpressure)
  )

  val timeSeries = b.add(
    Flow[ByteString]
    .via( watch("unpacking") )
    .via( unmarshalTimeSeriesData )
    .via( watch("unpacked") )
  )

  val scoring = b.add(
   OutlierScoringModel.scoringGraph(planRouterRef = context.planRouter, config = conf)
  )

 intakeBuffer ~> timeSeries ~> scoring.in
                                scoring.out0 /*~> publishBuffer*/  ~> Sink.actorSubscriber(SpotlightSubscriber.props)
                                scoring.out1 ~> logUnrecognized ~> termUnrecognized

アクター加入者:

class SpotlightSubscriber extends ActorSubscriber with ActorLogging {
protected val logger: Logger = Logger(LoggerFactory.getLogger("SpotlightSubscriber"))
 protected def requestStrategy = WatermarkRequestStrategy(1)
def receive = {
case OnNext(outlier: Outliers) =>
  log.debug("[SpotlightSubscriber] Received : {}", outlier)
case OnError(err: Exception) =>
  log.error(err, "[SpotlightSubscriber] Received Exception in Spotlight Stream")
  context.stop(self)
case OnComplete =>
  log.info("[SpotlightSubscriber] Spotlight Stream Completed!")
  context.stop(self)
case _ => }}

スコアリングは処理作業を行います。起こっていることは、3 つの要素を持つソースが完全なイベントをシンクに送信することです。ソースから完了イベントを受信すると、actorSubscriber の onComplete イベントがトリガーされます。処理後、スコアリング エンジンは処理された結果をactorSubscriberに送り返しますが、もはやアクティブではないため、deadLetter メッセージを受け取ります。

4

0 に答える 0