私はかなり基本的な質問であると信じていることにしばらく苦労してきました。
Flowable
ネットワークからアイテムのバンドルを取得してそれらを発行する があります。
Flowable
.create(new FlowableOnSubscribe<Item>() {
@Override
public void subscribe(FlowableEmitter<Item> emitter) throws Exception {
Item item = retrieveNext();
while (item != null) {
emitter.onNext(item);
if (item.isLast) {
break;
}
item = retrieveNext();
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER)
.doOnRequest(new LongConsumer() {
@Override
public void accept(long t) throws Exception {
// ...
}
});
リクエストごとに動作する必要があります。つまり、必要に応じSubscription
て呼び出しますsubscription.request(n)
。
Backpressureに関する記事(「リアクティブ プル」セクション)Observer
では、 の視点が説明されており、簡単に言及されています。
ただし、[リアクティブ プル] が機能するには、オブザーバブル A と B が request() に正しく応答する必要があります <...> そのようなサポートは、オブザーバブルの要件ではありません
このようなサポートが原則としてどのように達成されるかについては、詳細には触れません。
Observable
そのような/を実装するための一般的なパターンはありFlowable
ますか? 良い例や参照が見つかりませんでした。私が考えることができる1つの悪いオプションは、while
ループ内emitter.requested() == 0
でロックしdoOnRequest()
、別のスレッドからロックを解除するモニターを持つことです。しかし、このアプローチは面倒に感じます。
では、この「スローダウン」は一般的にどのように達成されるのでしょうか?
ありがとう。