0

2 回サブスクライブする PublishSubject があります。最初のサブスクライバーは処理されたアイテムの数をカウントするだけで、この値は常にオブザーバーを介して送信したものと一致します。ただし、他のサブスクライバーはバッファーを使用しており、私 (75%) はオブザーバーを通過したすべてのアイテムを受信しないことがよくあります。バッファを間違って使用していますか? すべてのアイテムが処理されることを確認するために、オブザーバーへの送信を停止した後、タイムスパンよりも長く待機しています。

Integer downloads1 = 0;
Integer downloads2 = 0;
PublishSubject<Object> subject = PublishSubject.create();
// this subscriber count matches the expected
subject.subscribe(s -> {
  synchronized (downloads1) {
    downloads1 += 1;
  }
});
// this subscriber seems to miss items about 75% of the time
subject.buffer(100, TimeUnit.MILLISECONDS, 10).subscribe(list -> {
  synchronized (downloads2) {
    downloads2 += list.size();
  }
});
4

1 に答える 1

0

たぶん、このバグに遭遇しています: https://github.com/Netflix/RxJava/issues/534

ところで、サブスクライブの代わりにreduce(R initialValue, Func2<R,? super T,R> accumulator)、初期値 0 で使用する必要があります。そうすれば、自分で同期を行う必要はありません。

于 2014-04-17T10:31:16.013 に答える