私は Reactor を使用しており、いくつかのイベントを発行するフラックスを作成しています。私の問題は、フラックスに非フィルター サブスクライバーを追加しない限り、フィルターを使用して作成したサブスクライバーがしばらくすると失敗することです。
import reactor.core.publisher.EmitterProcessor
class PublishSubscribe {
companion object {
@JvmStatic
fun main(args: Array<String>) {
val publisher = EmitterProcessor.create<String>().connect()
writeAndGet(publisher)
writeAndGet(publisher)
writeAndGet(publisher)
}
fun writeAndGet(publisher: EmitterProcessor<String>) {
val result = publisher
.filter { true }
.takeUntil { it == "end" }
.collectList()
.subscribe()
val result2 = publisher
.filter { true }
.takeUntil { it == "end" }
.collectList()
.subscribe()
Thread.sleep(1000)
publisher.onNext("unu")
publisher.onNext("end")
try {
println("X=" + result.blockMillis(3000))
println("Y=" + result2.blockMillis(3000))
} catch (e: Exception) {
e.printStackTrace()
}
println(result.isTerminated)
println(result2.isTerminated)
println("---")
}
}
}
追加のサブスクライバーがいる場合、コードは正常に機能します。
...
val publisher = EmitterProcessor.create<String>().connect()
publisher.subscribe() //this solves the issue
writeAndGet(publisher)
...
私が間違っていることについてのアイデアはありますか?
よろしくお願いします