1

akka ストリームの ActorPublisher の例をたどったところ、ときどき次のメッセージが表示されました。

java.lang.IllegalStateException: ストリームが要素を要求していない場合、onNext は許可されません。totalDemand は 0 でした

ドキュメントを見て、彼らは説明します:

onNext を呼び出して要素をストリームに送信します。ストリーム サブスクライバーから要求された数の要素を送信できます。この金額は、totalDemand で照会できます。isActive で totalDemand>0 の場合にのみ onNext を使用できます。それ以外の場合、onNext は IllegalStateException をスローします。

ストリーム サブスクライバーがさらに多くの要素を要求すると、ActorPublisherMessage.Request メッセージがこのアクターに配信され、そのイベントに対応できます。totalDemand は自動的に更新されます。

totalDemand がゼロにならないようにするにはどうすればよいですか? このエラーが発生したとき、送信しようとしていたメッセージを失いました。

これが私が従ってきた例です:

http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html

これは私のクラスのテストです

object Test extends App {

  implicit val actorSystem = ActorSystem("ReactiveKafka")
  implicit val materializer = ActorFlowMaterializer()

  val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181")
  val publisher = kafka.consume("test", "groupName", new StringDecoder())

  val workerActor = actorSystem.actorOf(Props[Worker], name = "workerActor")

  Source(publisher).map(WorkerPool.Msg(_, workerActor)).runWith(Sink.actorSubscriber(WorkerPool.props))

}

さて、私は kafka からメッセージを受け取り、WorkerActor に渡していますが、1 秒あたり 10 件のメッセージを Kafka に送信すると、このエラーが原因でメッセージの一部が失われます。

アップデート

ここで説明されているエラーに直面していました(同じライブラリを使用):

https://github.com/softwaremill/reactive-kafka/issues/11

私はバッファを使用して解決しましたが、この PR で問題が解決するようです。

https://github.com/softwaremill/reactive-kafka/pull/13

4

1 に答える 1

2

下流のシンクに需要がない場合、唯一の選択肢は

  1. Workerより多くの需要が来るまでソースがメッセージの生成を停止できるように、需要がないことをデータ ソース フィードに伝えます (リアクティブ ソリューション)。
  2. バッファがいっぱいになる可能性のあるシンクからの要求を受け取り、とにかくメッセージをドロップするまで、メッセージをバッファリングします。
  3. 需要が 0 のときにメッセージをドロップします (これが現在の実装のようです)。

しかし、「背圧」の要点は、要求がないときに onNext が呼び出されないようにすることです。

上記のバッファリングオプションを実装するには、アクタの内部または外部でバッファリングできます。

  • 内部バッファー: ActorPublisherにフィードするアクターでのバッファー処理の例については 、ドキュメントの「ActorPublisher」の例を参照してください。
  • 外部バッファ: バッファリングされたマテリアライザーを使用するか、Flow.bufferストリームで外部バッファを使用します。
于 2015-06-18T16:25:09.893 に答える