akka ストリームの ActorPublisher の例をたどったところ、ときどき次のメッセージが表示されました。
java.lang.IllegalStateException: ストリームが要素を要求していない場合、onNext は許可されません。totalDemand は 0 でした
ドキュメントを見て、彼らは説明します:
onNext を呼び出して要素をストリームに送信します。ストリーム サブスクライバーから要求された数の要素を送信できます。この金額は、totalDemand で照会できます。isActive で totalDemand>0 の場合にのみ onNext を使用できます。それ以外の場合、onNext は IllegalStateException をスローします。
ストリーム サブスクライバーがさらに多くの要素を要求すると、ActorPublisherMessage.Request メッセージがこのアクターに配信され、そのイベントに対応できます。totalDemand は自動的に更新されます。
totalDemand がゼロにならないようにするにはどうすればよいですか? このエラーが発生したとき、送信しようとしていたメッセージを失いました。
これが私が従ってきた例です:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html
これは私のクラスのテストです
object Test extends App {
implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorFlowMaterializer()
val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181")
val publisher = kafka.consume("test", "groupName", new StringDecoder())
val workerActor = actorSystem.actorOf(Props[Worker], name = "workerActor")
Source(publisher).map(WorkerPool.Msg(_, workerActor)).runWith(Sink.actorSubscriber(WorkerPool.props))
}
さて、私は kafka からメッセージを受け取り、WorkerActor に渡していますが、1 秒あたり 10 件のメッセージを Kafka に送信すると、このエラーが原因でメッセージの一部が失われます。
アップデート
ここで説明されているエラーに直面していました(同じライブラリを使用):
https://github.com/softwaremill/reactive-kafka/issues/11
私はバッファを使用して解決しましたが、この PR で問題が解決するようです。