RxJavaを使用して、同じタイプを発行する別のObservableと交差しようとしている多数のアイテムを発行する1つのソースObservableがあります。いくつかのオプションを検討した結果、物事を構造化するための最も一貫した方法は次のようになります。
Observable<String> source = ...emits 20 items
Observable.create(subscriber -> {
source
.buffer(5)
.subscribe(things -> {
tocheck.getMatches(things) //emits 3 matches
.subscribe(subscriber::onNext, subscriber::onError, () -> {});
}, subscriber::onError, subscriber::onCompleted));
ここで期待される出力は、結果の Observable をサブスクライブすると、12 個のアイテムが発行されることです。getMatches の契約により、結果をバッファリングする必要があります。
一見、これは機能するように見えますが、最もクリーンな方法とは思えません。パフォーマンス上の理由から、すべてのアイテムで交差チェックを実行できないため、ここではフィルターが適用されないようです。flatMap をいじってみましたが、ソース オブザーバブルからの完了通知ではなく、getMatches オブザーバブルがストリームを完了します。
これを構造化するより良い方法はありますか?
編集:このスタイルのコードで何が起こっていたのかを明確にするために:
Observable<String> source = ...emits 20 items
source
.buffer(5)
.flatMap(this::getMatches); //final observable would emit a total of 12 items
これは明らかにすっきりしていますが、ログを追加すると (元のスニペットと同じデータ サイズを仮定すると:
source
.doOnEach(notification -> {
log.trace("Processing {}", notification.getValue());
})
.buffer(5)
.flatMap(this::getMatches)
.doOnEach(notification -> {
log.trace("Processing after match {}", notification.getValue());
});
「処理中」ログの 20 のインスタンスを取得した後、不思議なことに、「処理後」からわずか数行のログ行しか取得しません (12 と予想される場合)。本来よりも早く on complete を呼び出しているようです。多分私は何か間違った構造をしていますか?