このコンテキストでは、Couchbase を使用して、2 レベルのドキュメント ストアに REST CRUD サービスを実装しています。データ モデルは、0 個以上のアイテム ドキュメントを指すインデックス ドキュメントです。インデックス ドキュメントは、非同期の get を使用して Observable として取得されます。この後に、アイテム ドキュメントごとに 0 個以上の ID を取得する .flatMap() が続きます。async get は Observable を返すので、今作成している Observable は Observable> です。「Observableを発行するObservableを受け取り、その出力を単一のObservableの出力にマージする」.merge()演算子をチェーンして、ReactiveXドキュメントを引用したい:)次に、その単一に.subscribe()しますアイテム ドキュメントを取得するための Observable。.merge() 演算子には多くのシグネチャがありますが、次のように一連の演算子で使用する方法がわかりません。
bucket
.async()
.get(id)
.flatMap(
document -> {
JsonArray itemArray = (JsonArray) document.content().get("item");
// create Observable that gets and emits zero or more
// Observable<Observable<JsonDocument>> ie. bucket.async().gets
Observable<Observable<JsonDocument>> items =
Observable.create(observer -> {
try {
if (!observer.isUnsubscribed()) {
itemArray.forEach(
(jsonObject) -> {
String itemId = ((JsonObject)jsonObject).get("itemid").toString();
observer.onNext(
bucket.async().get(itemId)
);
}
}
);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
);
return items;
},
throwable -> {
// error handling omitted...
return Observable.empty();
},
() -> {
// on complete processing omitted...
return null;
}
)
.merge( ???????? )
.subscribe(
nextItem -> {
// do something with each item document...
},
throwable -> {
// error handling omitted...
},
() -> {
// do something else...
}
);
編集:
あなたはおそらく私が反応的な初心者だと思ったでしょう。@akarnokd からの回答は、自分がやろうとしていたことがばかげていることに気付くのに役立ちました。Observable<Observable<JsonDocument>>
解決策は、クロージャー内のアイテムからの排出量をマージし、document
その結果を返すことです。これにより、次の結果が出力JsonDocuments
されますflatMap
。
bucket
.async()
.get(id)
.flatMap(
document -> {
JsonArray itemArray = (JsonArray) document.content().get("item");
// create Observable that gets and emits zero or more
// Observable<Observable<JsonDocument>> ie. bucket.async().gets
Observable<Observable<JsonDocument>> items =
Observable.create(observer -> {
try {
if (!observer.isUnsubscribed()) {
itemArray.forEach(
(jsonObject) -> {
String itemId = ((JsonObject)jsonObject).get("itemid").toString();
observer.onNext(
bucket.async().get(itemId)
);
}
}
);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
);
return Observable.merge(items);
},
throwable -> {
// error handling omitted...
return Observable.empty();
},
() -> {
// on complete processing omitted...
return null;
}
)
.subscribe(
nextItem -> {
// do something with each item document...
},
throwable -> {
// error handling omitted...
},
() -> {
// do something else...
}
);
テスト済みで動作します:)