flatMapのソースに同期ブロックを使いたい。しかし、内部ソースが作成されたときだけでなく、処理 (メソッド processItem) にもこの構造を使用する必要があります。
この Observable は 5 分ごとに呼び出されます (Observable.interval など)。
Observable.fromIterable(getSourceData())
.flatMapCompletable(item -> {
synchronized(LockManager.getInstance().getLockObject(item.id)){
return processItem(item)
.subscribeOn(Schedulers.io());
}
})
私の processesItem メソッドは次のようになります。
public Completable processItem(Item item){
return mApiSource.getItemById(item.id)
.flatMap(item ->
mItemRepository.replace(item)
)
.toCompletable();
}
どちらの内部メソッドも Single を返します。
これは、サーバーからの定期的な更新方法の一部です。プロジェクトの他のクラスから呼び出されるアイテムの変更(更新、削除)のメソッド(アイテムとの同期)を使用して、processItemメソッド呼び出し(サーバーからのアイテムの定期的な同期)をシリアル化する必要があります。
主な問題は、定期的な更新が新しく更新されたアイテムを書き換えることができることです。
実際、私はこのソリューションを使用しています:
新しいアイテムを更新するためのチェーン:
public Completable updateItem(Item item){
return Completable.fromAction(() -> {
synchronized(LockManager.getInstance().getLockObject(item.id)){
mApiSource.update(item)
.flatMap(item ->
mItemRepository.replace(item)
)
.toCompletable()
.blockingAwait();
}
})
.subscribeOn(Schedulers.io())
}
定期更新のチェーン:
Observable.fromIterable(getSourceData())
.flatMapCompletable(item -> {
Completable.fromAction(() ->{
synchronized(LockManager.getInstance().getLockObject(item.id)){
processItem(item).blockingAwait();
}
})
.subscribeOn(Schedulers.io())
});
RxJavaソリューションが明確ではないことはわかっています。
この問題のより良い解決策を知っていますか?