私は、 Reactive Extensions for Twitter のストリーミング APIIObservable<T>
を使用して実装の作成に取り組んできました。
高レベルから HTTP 要求が送信され、接続が開かれたままになります。長さがプレフィックスされたアイテムは、消費するために送信されます。
基本的に、これはキーワードStream.ReadAsync
を使用するループ内の呼び出しです。インターフェースの実装 ( Dataflow ライブラリからのまたはブロック、それは問題ではありません。これは実装の詳細です) がこのループに渡され、実装のメソッドが呼び出され、オブザーバブルが生成されます。await
IObserver<T>
Observable.Create
IObserver<T>
ただし、このループが処理を開始する前にTask<T>
、-returning メソッドの呼び出しを必要とする多くのことを行う必要があります。これらはすべて、await
キーワードを使用して C# 5.0 でより簡単に呼び出すことができます。このようなもの:
public async Task<IObservable<string>> Create(string parameter,
CancellationToken cancellationToken)
{
// Make some call that requires await.
var response = await _httpClient.SendAsync(parameter, cancellationToken).
ConfigureAwait(false);
// Create a BufferBlock which will be the observable.
var block = new BufferBlock<T>();
// Start some background task which will use the block and publish to it
// on a separate task. This is not something that is awaited for.
ConsumeResponse(response, block, cancellationToken);
// Create the observable.
return block.AsObservable();
}
とは言っても、私は現在メソッドから a を返していますが、必要な呼び出しを容易にするためTask<IObservable<T>>
に使用できる Reactive Extensions に何かが欠けていると感じていますが、 a の代わりに a を返すこともできます。await
IObservable<T>
Task<IObservable<T>>
Reactive Extensions のどのメソッドを使用すると、作成メソッドから戻る前にメソッドを待機する必要があるオブザーバブルを作成できますか?
私が見つけた最も近いものはObservable.DeferAsync
. 私のメソッドへの呼び出しとオブザーバブルの使用が次のようなものであると仮定します。
public async Task Observe()
{
// NOT the real name of the interface, but explains it's role here.
IObservableFactory factory;
// Create is really named something else.
IObservable<string> observable = factory.Create("parameter");
// Subscribe.
observable.Subscribe(o => Console.WriteLine("Observed: {0}", o));
// Wait.
await observable;
}
への呼び出しは最初のリクエストを送信し、それを読み取り、次に on への呼び出しは2 番目のサブスクリプションを作成しますが、別のオブザーバブルに作成するため、使用DeferAsync
はここでは機能しません。Subscribe
await
observable
それとも、最終的にTask<IObservable<T>>
、リアクティブ フレームワークでこれを行うための適切なメソッドを返していますか?
その後、メソッドは a を返すためTask<T>
、 a を渡しCancellationToken
て操作をキャンセルすることをお勧めします。そうは言っても、オブザーバブルの作成CancellationToken
をキャンセルするために使用されていることは理解できますが、実際のオブザーバブルをキャンセルするためにも使用する必要があります(ストリームを読み取るために渡される可能性があるため)。
ここでは懸念事項の分離に違反しているだけでなく、キャンセルによる DRY 原則にも違反しているため、私の直感ではノーと言います。
- 作成のキャンセルとオブザーバブルのキャンセルは、2 つの別個のものです。
- 呼び出しは、サブスクリプションをキャンセルする実装
Subscribe
を返します。IDisposable