RxJava でデータ同期ジョブを作成しているときに、説明できない奇妙な動作を発見しました。私はRxJavaの初心者であり、助けていただければ幸いです。
簡単に言うと、私の仕事は非常に単純です。要素 ID のリストがあり、Web サービスを呼び出して各要素を ID で取得し、いくつかの処理を行い、複数の呼び出しを行ってデータを DB にプッシュします。データの読み込みはデータの保存よりも高速であるため、OutOfMemory エラーが発生しました。
私のコードはほとんど「失敗した」テストのように見えますが、いくつかのテストを実行すると、次の行を削除することに気付きました:
flatMap(dt -> Observable.just(dt))
それを機能させます。テスト出力の失敗は、消費されていないアイテムが積み重なり、これが OutOfMemory につながることを明確に示しています。実際のテスト出力は、プロデューサーが常にコンシューマーを待機しているため、これが OutOfMemory につながることは決してないことを示しています。
public static class DataStore {
public Integer myVal;
public byte[] myBigData;
public DataStore(Integer myVal) {
this.myVal = myVal;
this.myBigData = new byte[1000000];
}
}
@Test
public void working() {
int MAX_CONCURRENT_LOAD = 1;
int MAX_CONCURRENT_STORE = 2;
AtomicInteger nbUnconsumed = new AtomicInteger(0);
List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Observable.from(ids)
.flatMap(this::produce, MAX_CONCURRENT_LOAD)
.doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
.flatMap(this::consume, MAX_CONCURRENT_STORE)
.doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
.toBlocking().forEach(s -> {});
logger.info("Finished");
}
@Test
public void failing() {
int MAX_CONCURRENT_LOAD = 1;
int MAX_CONCURRENT_STORE = 2;
AtomicInteger nbUnconsumed = new AtomicInteger(0);
List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Observable.from(ids)
.flatMap(this::produce, MAX_CONCURRENT_LOAD)
.doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
.flatMap(dt -> Observable.just(dt))
.flatMap(this::consume, MAX_CONCURRENT_STORE)
.doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
.toBlocking().forEach(s -> {});
logger.info("Finished");
}
private Observable<DataStore> produce(final int value) {
return Observable.<DataStore>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(200); //Here I synchronous call WS to retrieve data
s.onNext(new DataStore(value));
s.onCompleted();
}
} catch (Exception e) {
s.onError(e);
}
}).subscribeOn(Schedulers.io());
}
private Observable<Boolean> consume(DataStore value) {
return Observable.<Boolean>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(1000); //Here I synchronous call DB to store data
s.onNext(true);
s.onCompleted();
}
} catch (Exception e) {
s.onNext(false);
s.onCompleted();
}
}).subscribeOn(Schedulers.io());
}
この動作の背後にある説明は何ですか? 私の実際のケースでは Observable.from(someListOfItme) である Observable.just(dt)) を削除せずに失敗したテストを解決するにはどうすればよいですか