6

私は、 Reactive Extensions for Twitter のストリーミング APIIObservable<T>を使用して実装の作成に取り組んできました。

高レベルから HTTP 要求が送信され、接続が開かれたままになります。長さがプレフィックスされたアイテムは、消費するために送信されます。

基本的に、これはキーワードStream.ReadAsyncを使用するループ内の呼び出しです。インターフェースの実装 ( Dataflow ライブラリからのまたはブロック、それは問題ではありません。これは実装の詳細です) がこのループに渡され、実装のメソッドが呼び出され、オブザーバブルが生成されます。awaitIObserver<T>Observable.CreateIObserver<T>

ただし、このループが処理を開始する前にTask<T>、-returning メソッドの呼び出しを必要とする多くのことを行う必要があります。これらはすべて、awaitキーワードを使用して C# 5.0 でより簡単に呼び出すことができます。このようなもの:

public async Task<IObservable<string>> Create(string parameter,
    CancellationToken cancellationToken)
{
     // Make some call that requires await.
     var response = await _httpClient.SendAsync(parameter, cancellationToken).
         ConfigureAwait(false);

     // Create a BufferBlock which will be the observable.
     var block = new BufferBlock<T>();

     // Start some background task which will use the block and publish to it
     // on a separate task.  This is not something that is awaited for.
     ConsumeResponse(response, block, cancellationToken);

     // Create the observable.
     return block.AsObservable();
}

とは言っても、私は現在メソッドから a を返していますが、必要な呼び出しを容易にするためTask<IObservable<T>>に使用できる Reactive Extensions に何かが欠けていると感じていますが、 a の代わりに a を返すこともできます。awaitIObservable<T>Task<IObservable<T>>

Reactive Extensions のどのメソッドを使用すると、作成メソッドから戻る前にメソッドを待機する必要があるオブザーバブルを作成できますか?

私が見つけた最も近いものはObservable.DeferAsync. 私のメソッドへの呼び出しとオブザーバブルの使用が次のようなものであると仮定します。

public async Task Observe()
{
    // NOT the real name of the interface, but explains it's role here.
    IObservableFactory factory;

    // Create is really named something else.
    IObservable<string> observable = factory.Create("parameter");

    // Subscribe.
    observable.Subscribe(o => Console.WriteLine("Observed: {0}", o));

    // Wait.
    await observable;
}

への呼び出しは最初のリクエストを送信し、それを読み取り、次に on への呼び出しは2 番目のサブスクリプションを作成しますが、別のオブザーバブルに作成するため、使用DeferAsyncはここでは機能しません。Subscribeawaitobservable

それとも、最終的にTask<IObservable<T>>、リアクティブ フレームワークでこれを行うための適切なメソッドを返していますか?

その後、メソッドは a を返すためTask<T>、 a を渡しCancellationTokenて操作をキャンセルすることをお勧めします。そうは言っても、オブザーバブルの作成CancellationTokenをキャンセルするために使用されていることは理解できますが、実際のオブザーバブルをキャンセルするためにも使用する必要があります(ストリームを読み取るために渡される可能性があるため)。

ここでは懸念事項の分離に違反しているだけでなく、キャンセルによる DRY 原則にも違反しているため、私の直感ではノーと言います。

  • 作成のキャンセルとオブザーバブルのキャンセルは、2 つの別個のものです。
  • 呼び出しは、サブスクリプションをキャンセルする実装Subscribeを返します。IDisposable
4

2 に答える 2

8

私は . を返しませんTask<IObservable<T>>。パブリック API でタスクとオブザーバブルを混在させると、混乱するだけです。タスクは、単一の値を生成するオブザーバブルと考えることができることに注意してください。これは、パブリック API で CancellationToken とオブザーバブルを混在させないことも意味します。サブスクライブおよびサブスクライブ解除することで、オブザーバブルを制御します。

それは、舞台裏でコンセプトを混ぜ合わせてはいけないという意味ではありません。Observable.Usingを使用しTask.ToObservableてやりたいことを行う方法は次のとおりです。CancellationDisposable

まず、以下を返すようにメソッドを変更しますTask<ISourceBlock<string>>

public async Task<ISourceBlock<string>> CreateBlock(string parameter, CancellationToken cancellationToken)
{
     // Make some call that requires await.
     var response = await _httpClient.SendAsync(parameter, cancellationToken).ConfigureAwait(false);

     // Create a BufferBlock which will be the observable.
     var block = new BufferBlock<T>();

     // Start some background task which will use the block and publish to it
     // on a separate task.  This is not something that is awaited for.
     ConsumeResponse(response, block, cancellationToken);

     return block;
}

上記のメソッドを使用する新しい Create メソッドを次に示します。

public IObservable<string> Create(string parameter)
{
    // Create a cancellation token that will be canceled when the observable is unsubscribed, use this token in your call to CreateBlock.
    // Use ToObservable() to convert the Task to an observable so we can then
    // use SelectMany to subscribe to the block itself once it is available
    return Observable.Using(() => new CancellationDisposable(),
           cd => CreateBlock(parameter, cd.Token)
               .ToObservable()
               .SelectMany(block => block.AsObservable()));
}

編集:Rxがすでにこのパターンを実装していることを発見しましたFromAsync

public IObservable<string> Create(string parameter)
{
    return Observable.FromAsync(token => CreateBlock(parameter, token))
                     .SelectMany(block => block.AsObservable());
}

また、実際に観察したい Observable (ブロックなど) を実際に作成しているDeferAsyncため、これはさらに適切です。Task

public IObservable<string> Create(string parameter)
{
    return Observable.DeferAsync(async token => (await CreateBlock(parameter, token)).AsObservable());
}
于 2013-04-01T03:04:05.923 に答える
2

BufferBlockonly after afterを作成する必要がある理由がわかりませんawaitBufferBlock代わりにできることは、を作成し、非同期初期化を開始してすぐに戻る同期メソッドを持つことです。何かのようなもの:

public IObservable<string> Create(
    string parameter, CancellationToken cancellationToken)
{
     // Create a BufferBlock which will be the observable.
     var block = new BufferBlock<string>();

     // Start asynchronous initialization, but don't wait for the result
     InitializeAsync(parameter, block, cancellationToken);

     // Create the observable.
     return block.AsObservable();
}

private async Task InitializeAsync(
    string parameter, ITargetBlock<string> block,
    CancellationToken cancellationToken)
{
     // Make some call that requires await.
     var response = await _httpClient.SendAsync(parameter, cancellationToken).
         ConfigureAwait(false);

     // Start some background task which will use the block and publish to it
     // on a separate task.  This is not something that is awaited for.
     ConsumeResponse(response, block, cancellationToken);
}

(おそらく、渡されたブロックをInitializeAsync()呼び出してエラーを処理することも必要になるでしょう。)Fault()

このように、Create()メソッドは単にIObservable<T>を返しますが、初期化も非同期で実行します。

それとも、最終的にTask<IObservable<T>>、リアクティブ フレームワークでこれを行うための適切なメソッドを返していますか?

私はそうは思わない。ここでは、2 レベルの非同期性は必要ないと思います。

作成のキャンセルとオブザーバブルのキャンセルは、2 つの別個のものです。

これは正確な要件によって異なりますが、一般的に、それらは別個のものではないと思います。操作をキャンセルしたいのですが、既に開始されているかどうかは関係ありません。

CancellationTokenこれは、ビヘイビアに渡される方法と似ています。実行を開始する前にキャンセルするためと、既に開始されている場合は適切なキャンセルを検出するためにTask.Run()使用されます。Task

呼び出しは、サブスクリプションをキャンセルする実装Subscribeを返します。IDisposable

はい、でもそれだけです。ここで説明しているのは、ホット オブザーバブル (オブザーバーに関係なくアイテムを生成する) が必要であるため、実際にはオブザーバブルはキャンセルされず、サブスクリプションのみがキャンセルされます。

于 2013-03-31T17:44:08.343 に答える