0

アプリにレトロフィット サービスに支えられたデータ レイヤーがあります。(これまでのところ、ネットワーク経由の永続性のみがあります。開発が進んだら、オフライン ファーストのローカル ストレージを追加します)

Observable<List<Item>>サーバーを呼び出すとRetrofit から が返されます。これはうまくいきます。サブスクライバーでは、サブスクライブ時にリストを受け取り、UI にItems.

私が抱えている問題は次のとおりです。リストが(何らかの外部メカニズムによって)変更された場合、オブザーバブルがレトロフィットサービスを再クエリし、アイテムの新しいリストを発行するにはどうすればよいですか。データが古いことは承知していますが、再クエリを開始する方法がわかりません。

これが私の簡略化されたバージョンですDataManager

class DataManager {

    // Retrofit
    RetrofitItemsService itemsService;

    // The observalble provided by retrofit
    Observable<List<Item>> itemsObservable;

    //ctor
    public DataManager(RetrofitItemsService itemsService) {
        this.itemsService = itemsService;
    }

    /* Creates and stores an observable if one has not been created yet.
     * Returns the observable so that it can be subscribed to by the function caller
     */
    public Observable<List<Item>> getItems(){
        if(itemsObservable == null){
            itemsObservable = itemsService.getItems();
        }

        return itemsObservable;
    }

    /* Adds a new Item to the list.
     */
    public Completable addItem(Item item){
        Completable call = itemsService.addItem(item);

        call.subscribe(()->{
            /*
             < < < Here > > >
             If someone has previously called getItems before this item was added, they now have stale data.

             How can I call something like:

             itemsObservable.refreshAllSubscribers()
            */
        });

        return call;
    }
}
4

2 に答える 2

2

ここであなたが戦っている問題は、観測可能な高温低温の違いにあります。違いを詳細に説明している素晴らしい記事がたくさんありますので、基本的なことだけを説明しましょう。

Cold Observable は、すべてのサブスクライバーに対して新しいプロデューサーを作成します。つまり、2 つの別々のサブスクライバーが同じコールド オブザーバブルにサブスクライブすると、それぞれがこれらのエミッションの異なるインスタンスを受け取ります。それらは(!) 等しいかもしれませんが、決して異なるオブジェクトではありません。ここでのケースに適用すると、各サブスクライバーは、サーバーにデータを要求してストリームに渡す独自のプロデューサーを取得します。各サブスクライバーには、独自のプロデューサーからのデータが提供されます。

ホットオブザーバブルは、プロデューサーをそのすべてのオブザーバーと共有します。たとえば、プロデューサーがオブジェクトのコレクションを反復処理している場合、2 番目のサブスクライバーを発行の途中で使用すると、後で発行された項目のみを取得することになります ( のようなオペレーターによって変更されていない場合replay)。サブスクライバーによって受信されたすべてのオブジェクトは、単一のプロデューサーからのものであるため、すべてのオブザーバーで同じインスタンスでもあります。

そのため、データを配布するにはホットなオブザーバブルが必要です。これにより、データが有効でなくなったことがわかったときに、このホットなオブザーバブルを介して 1 回送信するだけで、すべてのオブザーバーが更新に満足できるようになります。

幸いなことに、通常、コールドのオブザーバブルをホットに変えることは大したことではありません。この動作を模倣する独自のプロデューサーを作成するか、次のような一般的な演算子のいずれかを使用するかshare、ストリームを変換してそのように動作させることができます。

PublishSubject を使用してデータを更新し、次のように元のコールド オブザーバブルとマージすることをお勧めします。

class DataManager {

    .....

    PublishSubject<Boolean> refreshSubject = PublishSubject.create();

    // The observable for retrieving always fresh data
    Observable<List<Item>> itemsObservable;

    //ctor
    public DataManager(RetrofitItemsService itemsService) {
        this.itemsService = itemsService;
        itemsObservable = itemsService.getItems()
                              .mergeWith(refreshSubject.flatMap(refresh -> itemsService.getItems()))
    }


    public Observable<List<Item>> getItems(){
        return itemsObservable;
    }

    /* Adds a new Item to the list.
     */
    public Completable addItem(Item item){
        Completable call = itemsService.addItem(item);

        call.subscribe(()->{
            refreshSubject.onNext(true);
        });

        return call;
    }
}
于 2016-12-20T11:54:25.987 に答える
1

itemsService.getItems()は単一の要素を返すと思いますObservableので、消費者はとにかく新しいデータを取得するために再サブスクライブする必要があり、Retrofit Observables も遅延/遅延であるため、それを取得します。

データが変更されたときにトリガーできることを利用して、別の "long" を持つことができObservableます。PublishSubject

final Subject<Object> onItemsChanged = PublishSubject.create().toSerialized();

public Observable<Object> itemsChanged() {
    return onItemsChanged;
}

public Completable addItem(Item item){
    Completable call = itemsService.addItem(item);

    // prevent triggering the addItem multiple times
    // Needs RxJava 2 Extensions library for now
    // as there is no Completable.cache() or equivalent in 2.0.3
    CompletableSubject cs = CompletableSubject.create();

    call.doOnComplete(() -> onItemsChanged.onNext("changed"))
    .subscribe(cs);

    return cs;
}
于 2016-12-20T11:55:47.567 に答える