2

マルチキャスト処理にObservable.Publishを使用するライフサイクルについて少し混乱しています。どのように接続を正しく使用する必要がありますか?直感に反して、マルチキャストオブザーバーがサブスクリプションを開始するためにconnectを呼び出す必要がないことがわかりました。

var multicast = source.Publish();
var field0 = multicast.Select(record => record.field0);
var field1 = multicast.Select(record => record.field1);

// Do I need t*emphasized text*o call here?
var disposable = multicast.connect()

// Does calling 
disposable.Dispose();
// unsubscribe field0 and field1?

編集

私のパズルは、IConnectableObservableでConnectを明示的に呼び出さなかったときに正常にサブスクライブできた理由です。ただし、暗黙的にConnectを呼び出すIConnectableObservableでAwaitを呼び出していました

Public Async Function MonitorMeasurements() As Task


    Dim cts = New CancellationTokenSource

    Try
        Using dialog = New TaskDialog(Of Unit)(cts)

            Dim measurementPoints = 
                MeasurementPointObserver(timeout:=TimeSpan.FromSeconds(2)).
                TakeUntil(dialog.CancelObserved).Publish()

            Dim viewModel = New MeasurementViewModel(measurementPoints)
            dialog.Content = New MeasurementControl(viewModel)
            dialog.Show()

            Await measurementPoints
        End Using
    Catch ex As TimeoutException
        MessageBox.Show(ex.Message)
    Catch ex As Exception
        MessageBox.Show(ex.Message)
    End Try

End Function

TaskDialogは、キャンセルボタンが押されたときにCancelObservedと呼ばれるオブザーバブルを公開することに注意してください。

解決

解決策は@astiによるリンクに投稿されています。これがそのリンクのRXチームからの引用です

awaitを使用すると、サブスクリプションが発生するため、監視可能なシーケンスがホットになります。このリリースには、IConnectableObservableのサポートが含まれています。これにより、シーケンスがソースに接続され、サブスクライブされます。Connect呼び出しがないと、await操作は完了しません。

4

2 に答える 2

6

Publishソース上で、IConnectableObservable<T>本質的にメソッドを使用するをIObservable<T>返しConnectます。を使用ConnectIDisposableて、ソースへのサブスクリプションを制御するために戻ります。

Rxは、ファイアアンドフォーゲットシステムとして設計されています。サブスクリプションは、明示的に破棄するか、完了するかエラーになるまで終了しません。

つまり、disp0 = field0.Subscribe(...); disp1 = field1.Subscribe(...) -サブスクリプションdisp0, disp1は明示的に破棄されるまで終了しません-これはマルチキャストソースへの接続とは無関係です。

以下のパイプラインを邪魔することなく、接続と切断を行うことができます。接続を手動で管理することを心配しない簡単な方法は.Publish().RefCount()、少なくとも1人のオブザーバーがまだサブスクライブしている限り接続を維持するを使用することです。これは、オブザーバブルのウォーミングアップとして知られています。


質問の編集のために更新されました

OPはを呼び出しawaitていましたIConnectableObservable<T>

Rxのリリースノートから:

.. awaitを使用すると、サブスクリプションが発生するため、監視可能なシーケンスがホットになります。このリリースには、IConnectableObservableのサポートが含まれています。これにより、シーケンスがソースに接続され、サブスクライブされます。Connect呼び出しがないと、await操作は完了しません。

例(同じページから取得)

static async  void Foo()
{
    var xs = Observable.Defer(() =>
    {
        Console.WriteLine("Operation started!");
        return Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);
    });

    var ys = xs.Publish();

    // This doesn't trigger a connection with the source yet.
    ys.Subscribe(x => Console.WriteLine("Value = " + x));

    // During the asynchronous sleep, nothing will be printed.
    await Task.Delay(5000);

    // Awaiting causes the connection to be made. Values will be printed now,
    // and the code below will return 9 after 10 seconds.
    var y =  await ys;
    Console.WriteLine("Await result = " + y);
}
于 2012-10-22T08:48:25.277 に答える
4

公開すると、サブスクリプションを共有できます。これは明らかに、Coldの観測可能なシーケンスをHotにするのに最も役立ちます。つまり、サブスクリプションの副作用(おそらくネットワークへの接続)が発生するシーケンスを取得し、副作用が1回実行され、シーケンスの結果がコンシューマー間で共有されるようにします。

実際には、コールドシーケンスで公開を呼び出し、コンシューマーをサブスクライブしてから、サブスクリプション後に公開されたシーケンスを接続して、競合状態を緩和します。

つまり、基本的に、上記で行ったことです。

サブジェクト、FromEventPattern、またはすでに公開され接続されているものなど、すでにホットなシーケンスにはほとんど意味がありません。

Connect()メソッドから値を破棄すると、シーケンスが「切断」され、コンシューマーがそれ以上値を取得できなくなります。これらのいずれかを早期に切り離したい場合は、コンシューマーサブスクリプションを破棄することもできます。

これらすべてを言っても、あなたは正しいことをしているように見えます。あなたが見ている問題は何ですか?すでにHOTシーケンスに接続していると想定しています。

于 2012-10-22T08:36:10.487 に答える