アグリゲーションのサブスクリプションを保持する代わりに、ウィンドウ処理を操作できます。これは、最後のウィンドウが到着するまで接続を維持し、パーティション分割に時間がかかりすぎる場合に備えて、タイムアウトを使用して切断します。
ここでは別のクラスが使用されています。使用Create
すると Auto Detach になるためです。これにより、dispose 呼び出しが行われた後、オブザーバーがすぐに切断されます。基本的に、Dispose の意味は、ここで変更されたものです。
public static IObservable<T> DeferDisconnection<T>(this IObservable<T> observable, TimeSpan timeout)
{
return new ClosingObservable<T>(observable, timeout);
}
public class ClosingObservable<T> : IObservable<T>
{
private readonly IConnectableObservable<T> Source;
private readonly IDisposable Subscription;
private readonly TimeSpan Timeout;
public ClosingObservable(IObservable<T> observable, TimeSpan timeout)
{
Timeout = timeout;
Source = observable.Publish();
Subscription = Source.Connect();
}
public IDisposable Subscribe(IObserver<T> observer)
{
Source.Subscribe(observer);
return Disposable.Create(() => Source.Select(_ => new Unit())
.Amb(Observable.Timer(Timeout).Select(_ => new Unit()))
.Subscribe(_ => Subscription.Dispose())
);
}
}
テスト:
var disposable =
Observable.Interval(TimeSpan.FromSeconds(2))
.Do(Console.WriteLine)
.DeferDisconnection(TimeSpan.FromSeconds(5))
.Subscribe();
Console.ReadLine();
disposable.Dispose();
Console.ReadLine();