2 回サブスクライブする PublishSubject があります。最初のサブスクライバーは処理されたアイテムの数をカウントするだけで、この値は常にオブザーバーを介して送信したものと一致します。ただし、他のサブスクライバーはバッファーを使用しており、私 (75%) はオブザーバーを通過したすべてのアイテムを受信しないことがよくあります。バッファを間違って使用していますか? すべてのアイテムが処理されることを確認するために、オブザーバーへの送信を停止した後、タイムスパンよりも長く待機しています。
Integer downloads1 = 0;
Integer downloads2 = 0;
PublishSubject<Object> subject = PublishSubject.create();
// this subscriber count matches the expected
subject.subscribe(s -> {
synchronized (downloads1) {
downloads1 += 1;
}
});
// this subscriber seems to miss items about 75% of the time
subject.buffer(100, TimeUnit.MILLISECONDS, 10).subscribe(list -> {
synchronized (downloads2) {
downloads2 += list.size();
}
});