2

RxJavaを使用して、同じタイプを発行する別のObservableと交差しようとしている多数のアイテムを発行する1つのソースObservableがあります。いくつかのオプションを検討した結果、物事を構造化するための最も一貫した方法は次のようになります。

Observable<String> source = ...emits 20 items

Observable.create(subscriber -> {
    source
        .buffer(5)
        .subscribe(things -> {
            tocheck.getMatches(things) //emits 3 matches
                .subscribe(subscriber::onNext, subscriber::onError, () -> {});
        }, subscriber::onError, subscriber::onCompleted));

ここで期待される出力は、結果の Observable をサブスクライブすると、12 個のアイテムが発行されることです。getMatches の契約により、結果をバッファリングする必要があります。

一見、これは機能するように見えますが、最もクリーンな方法とは思えません。パフォーマンス上の理由から、すべてのアイテムで交差チェックを実行できないため、ここではフィルターが適用されないようです。flatMap をいじってみましたが、ソース オブザーバブルからの完了通知ではなく、getMatches オブザーバブルがストリームを完了します。

これを構造化するより良い方法はありますか?

編集:このスタイルのコードで何が起こっていたのかを明確にするために:

Observable<String> source = ...emits 20 items

source
    .buffer(5)
    .flatMap(this::getMatches);  //final observable would emit a total of 12 items

これは明らかにすっきりしていますが、ログを追加すると (元のスニペットと同じデータ サイズを仮定すると:

source
    .doOnEach(notification -> {
        log.trace("Processing {}", notification.getValue());
    })
    .buffer(5)
    .flatMap(this::getMatches)
    .doOnEach(notification -> {
        log.trace("Processing after match {}", notification.getValue());
    });

「処理中」ログの 20 のインスタンスを取得した後、不思議なことに、「処理後」からわずか数行のログ行しか取得しません (12 と予想される場合)。本来よりも早く on complete を呼び出しているようです。多分私は何か間違った構造をしていますか?

4

1 に答える 1