バックプレッシャーを実装したカスタム Flowable を作成する必要があります。私はある種のページングを達成しようとしています。つまり、ダウンストリームが 5 アイテムをリクエストした場合、アイテム 0 ~ 5 について「データ ソースに問い合わせます」。その後、ダウンストリームが別の 5 アイテムを必要とする場合、アイテム 5 ~ 10 を取得して送り返します。
これまでに見つけた最善の方法はメソッドを使用することですが、ダウンストリームが要求しているアイテムの数Flowable.generate
を取得する方法が (私の知る限り) ない理由が本当にわかりません。requested
generatorのプロパティを使用してstate
、最後に要求されたアイテムのインデックスを保存できるので、新しく要求されたアイテムの数だけが必要になります。BiFunction で取得した emmiter インスタンスは、apply
からGeneratorSubscription
拡張されていAtomicLong
ます。したがって、 emmiter をキャストするAtomicLong
と、要求された番号を取得できます。しかし、これが「推奨される」方法ではないことはわかっています。
一方、使用すると、メソッドFlowable.create
を持つ FlowableEmitter を取得しますlong requested()
。を使用するgenerate
ことは、私のユースケースにより適していますが、今では「正しい」使用方法にも興味がありますFlowable.generate
。
たぶん私はすべてを考えすぎているので、正しい方向に向けてください。:) ありがとうございました。
実際のコードは次のようになります (Kotlin の場合):
Flowable.generate(Callable { 0 }, BiFunction { start /*state*/, emitter ->
val requested = (emitter as AtomicLong).get().toInt() //this is bull*hit
val end = start + requested
//get items [start to end] -> items
emmiter.onNext(items)
end /*return the new state*/
})