簡単に言えば
既存の Observable (まだ完了していない) が与えられた場合、関連付けられたサブスクライバー (サブスクライブに渡された関数) を取得して、代わりに別の Observable にサブスクライブさせる方法はありますか?
環境
私のアプリケーションのサービスは、SeverEvent接続を作成し、 ConnectableObservableをプロキシ接続に返し、publishオペレーターを使用してマルチキャストを許可するのに役立ちます。このサービスは、内部ストアを介して既存の接続を追跡します。
store: {[key: string]: ConnectionTracker};
// …
interface ConnectionTracker {
url: string;
eventSource: EventSource;
observable: rx.ConnectableObservable<any>;
subscription: rx.Subscription;
observer: rx.Observer<any>;
data?: any; // Arbitrary data
}
接続の作成時に、関連付けられたトラッカーが既に存在する場合 (ID は接続のエンドポイントを使用して作成されます)、サービスは次のことを行う必要があります。
- ok
既存のトラッカーのServerEvent接続を閉じます - ok
新しいSerevrEvent接続を開きます (したがって、新しい ConnectableObservable になります) - 既存のトラッカーの Observable を新しい Observable に置き換えますが、代わりに既存のサブスクライバーを新しい Observable にサブスクライブさせます
ConnectionTrackerを作成するコード部分は次のとおりです
/**
* Create/Update a ServerEvent connection tracker
*/
createTracker<T>(endpoint: string, queryString: string = null): ConnectionTracker
{
let fullUri = endpoint + (queryString ? `?${queryString}` : '')
, tracker = this.findTrackerByEndpoint(endpoint) || {
observable: null,
fullUri: fullUri,
eventSource: null,
observer: null,
subscription: null
}
;
// Tracker exists
if (tracker.observable !== null) {
// If fullUri hasn't changed, use the tracker as is
if (tracker.fullUri === fullUri) {
return tracker;
}
// At this point, we know "fullUri" has changed, the tracker's
// connection should be replaced with a fresh one
// ⇒ TODO
// ⇒ Gather old tracker.observable's subscribers/subscriptions to make
// them subscribe to the new Observable instead (created down below)
// Terminate previous connection and clean related resouces
tracker.observer.complete();
tracker.eventSource.close();
}
tracker.eventSource = new EventSource(<any>fullUri, {withCredentials: true});
tracker.observable = rx.Observable.create((observer: rx.Observer<T>) => {
// Executed once
tracker.eventSource.onmessage = e => observer.next(JSON.parse(e.data));
tracker.eventSource.onerror = e => observer.error(e);
// Keep track of the observer
tracker.observer = observer;
})
// Transform Observable into a ConnectableObservable for multicast
.publish()
;
// Start emitting right away and also keep a reference to
// proxy subscription for later disposal
tracker.subscription = tracker.observable.connect();
return tracker;
}
ありがとうございました。