0

このコンテキストでは、Couchbase を使用して、2 レベルのドキュメント ストアに REST CRUD サービスを実装しています。データ モデルは、0 個以上のアイテム ドキュメントを指すインデックス ドキュメントです。インデックス ドキュメントは、非同期の get を使用して Observable として取得されます。この後に、アイテム ドキュメントごとに 0 個以上の ID を取得する .flatMap() が続きます。async get は Observable を返すので、今作成している Observable は Observable> です。「Observableを発行するObservableを受け取り、その出力を単一のObservableの出力にマージする」.merge()演算子をチェーンして、ReactiveXドキュメントを引用したい:)次に、その単一に.subscribe()しますアイテム ドキュメントを取得するための Observable。.merge() 演算子には多くのシグネチャがありますが、次のように一連の演算子で使用する方法がわかりません。

bucket
.async()
.get(id)
.flatMap(
    document -> {

        JsonArray itemArray = (JsonArray) document.content().get("item");
        //  create Observable that gets and emits zero or more 
        // Observable<Observable<JsonDocument>> ie. bucket.async().gets
        Observable<Observable<JsonDocument>> items =
            Observable.create(observer -> {
                try {
                    if (!observer.isUnsubscribed()) {
                itemArray.forEach(
                    (jsonObject) -> {
                        String itemId = ((JsonObject)jsonObject).get("itemid").toString();
                        observer.onNext( 
                            bucket.async().get(itemId)
                        );
                    }
                    }
                );
                        observer.onCompleted();
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
        );          
        return items;
    },
    throwable -> {
        //  error handling omitted...
        return Observable.empty();
    },
    () -> {
        //  on complete processing omitted...
        return null;
    }
)
.merge( ???????? )
.subscribe( 
    nextItem -> {
        //  do something with each item document...
    },
    throwable -> {
        //  error handling omitted...
    },
    () -> {
         //  do something else...
    }
);

編集:

あなたはおそらく私が反応的な初心者だと思ったでしょう。@akarnokd からの回答は、自分がやろうとしていたことがばかげていることに気付くのに役立ちました。Observable<Observable<JsonDocument>>解決策は、クロージャー内のアイテムからの排出量をマージし、documentその結果を返すことです。これにより、次の結果が出力JsonDocumentsされますflatMap

bucket
.async()
.get(id)
.flatMap(
    document -> {

        JsonArray itemArray = (JsonArray) document.content().get("item");
        //  create Observable that gets and emits zero or more 
        // Observable<Observable<JsonDocument>> ie. bucket.async().gets
        Observable<Observable<JsonDocument>> items =
            Observable.create(observer -> {
                try {
                    if (!observer.isUnsubscribed()) {
                itemArray.forEach(
                    (jsonObject) -> {
                        String itemId = ((JsonObject)jsonObject).get("itemid").toString();
                        observer.onNext( 
                            bucket.async().get(itemId)
                        );
                    }
                    }
                );
                        observer.onCompleted();
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
        );          
        return Observable.merge(items);
    },
    throwable -> {
        //  error handling omitted...
        return Observable.empty();
    },
    () -> {
        //  on complete processing omitted...
        return null;
    }
)
.subscribe( 
    nextItem -> {
        //  do something with each item document...
    },
    throwable -> {
        //  error handling omitted...
    },
    () -> {
         //  do something else...
    }
);

テスト済みで動作します:)

4

2 に答える 2

0

呼び出しtoList()て、放出されたすべてのアイテムを 1 つのリストに集めることができます。私はそれをテストしていませんが、次のようなものはどうですか:

bucket.async()
  .get(id)
  .flatmap(document -> { return Observable.from((JsonArray)document.content().get("item")})
  .flatMap(bucket::get)
  .toList()
  .subscribe(results -> /* list of documents */);
于 2015-10-21T22:34:24.700 に答える
0

Java の表現上の制限により、merge()に適用できるパラメーターなしの演算子を持つことはできませんObservble<Observable<T>>。C# などの拡張メソッドが必要になります。

次善の策は、identity を行うことですflatMap

// ...
.flatMap(document -> ...)
.flatMap(v -> v)
.subscribe(...)
于 2015-10-21T14:03:37.483 に答える