1

私は Reactor を使用しており、いくつかのイベントを発行するフラックスを作成しています。私の問題は、フラックスに非フィルター サブスクライバーを追加しない限り、フィルターを使用して作成したサブスクライバーがしばらくすると失敗することです。

import reactor.core.publisher.EmitterProcessor

class PublishSubscribe {

companion object {
    @JvmStatic
    fun main(args: Array<String>) {
        val publisher = EmitterProcessor.create<String>().connect()
        writeAndGet(publisher)
        writeAndGet(publisher)
        writeAndGet(publisher)

    }

    fun writeAndGet(publisher: EmitterProcessor<String>) {
        val result = publisher
                .filter { true }
                .takeUntil { it == "end" }
                .collectList()
                .subscribe()

        val result2 = publisher
                .filter { true }
                .takeUntil { it == "end" }
                .collectList()
                .subscribe()


        Thread.sleep(1000)

        publisher.onNext("unu")
        publisher.onNext("end")

        try {

            println("X=" + result.blockMillis(3000))
            println("Y=" + result2.blockMillis(3000))

        } catch (e: Exception) {
            e.printStackTrace()
        }
        println(result.isTerminated)
        println(result2.isTerminated)
        println("---")

    }

}

}

追加のサブスクライバーがいる場合、コードは正常に機能します。

...
val publisher = EmitterProcessor.create<String>().connect()
publisher.subscribe()  //this solves the issue
writeAndGet(publisher)
...

私が間違っていることについてのアイデアはありますか?

よろしくお願いします

4

1 に答える 1