3

簡単に言えば

既存の Observable (まだ完了していない) が与えられた場合、関連付けられたサブスクライバー (サブスクライブに渡された関数) を取得して、代わりに別の Observable にサブスクライブさせる方法はありますか?

環境

私のアプリケーションのサービスは、SeverEvent接続を作成し、 ConnectableObservableをプロキシ接続に返し、publishオペレーターを使用してマルチキャストを許可するのに役立ちます。このサービスは、内部ストアを介して既存の接続を追跡します。

store: {[key: string]: ConnectionTracker};

// …

interface ConnectionTracker {
    url: string;
    eventSource: EventSource;
    observable: rx.ConnectableObservable<any>;
    subscription: rx.Subscription;
    observer: rx.Observer<any>;
    data?: any; // Arbitrary data
}

接続の作成時に、関連付けられたトラッカーが既に存在する場合 (ID は接続のエンドポイントを使用して作成されます)、サービスは次のことを行う必要があります。

  • ok既存のトラッカーのServerEvent接続を閉じます
  • ok新しいSerevrEvent接続を開きます (したがって、新しい ConnectableObservable になります)
  • 既存のトラッカーの Observable を新しい Observable に置き換えますが、代わりに既存のサブスクライバーを新しい Observable にサブスクライブさせます

ConnectionTrackerを作成するコード部分は次のとおりです

/**
* Create/Update a ServerEvent connection tracker
*/
createTracker<T>(endpoint: string, queryString: string = null): ConnectionTracker
{
    let fullUri = endpoint + (queryString ? `?${queryString}` : '')
        , tracker = this.findTrackerByEndpoint(endpoint) || {
            observable: null,
            fullUri: fullUri,
            eventSource: null,
            observer: null,
            subscription: null
        }
    ;

    // Tracker exists
    if (tracker.observable !== null) {
        // If fullUri hasn't changed, use the tracker as is
        if (tracker.fullUri === fullUri) {
            return tracker;
        }

        // At this point, we know "fullUri" has changed, the tracker's
        // connection should be replaced with a fresh one

// ⇒ TODO
// ⇒ Gather old tracker.observable's subscribers/subscriptions to make
//   them subscribe to the new Observable instead (created down below)

        // Terminate previous connection and clean related resouces
        tracker.observer.complete();
        tracker.eventSource.close();
    }

    tracker.eventSource = new EventSource(<any>fullUri, {withCredentials: true});
    tracker.observable = rx.Observable.create((observer: rx.Observer<T>) => {
            // Executed once
            tracker.eventSource.onmessage = e => observer.next(JSON.parse(e.data));
            tracker.eventSource.onerror = e => observer.error(e);
            // Keep track of the observer
            tracker.observer = observer;
        })
        // Transform Observable into a ConnectableObservable for multicast
        .publish()
    ;

    // Start emitting right away and also keep a reference to 
    // proxy subscription for later disposal
    tracker.subscription = tracker.observable.connect();

    return tracker;
}

ありがとうございました。

4

2 に答える 2