私の .NET 4.0 ライブラリには、ネットワーク経由でデータを送信し、応答を待つコードがあります。呼び出し元のコードをブロックしないために、メソッドはTask<T>
、応答を受信したときに完了する を返します。これにより、コードは次のようにメソッドを呼び出すことができます。
// Send the 'message' to the given 'endpoint' and then wait for the response
Task<IResult> task = sender.SendMessageAndWaitForResponse(endpoint, message);
task.ContinueWith(
t =>
{
// Do something with t.Result ...
});
基になるコードはTaskCompletionSourceを使用して、スレッドをスピンアップせずに応答メッセージを待機できるようにします。
private readonly Dictionary<int, TaskCompletionSource<IResult>> m_TaskSources
= new Dictionary<int, TaskCompletionSource<IResult>>();
public Task<IResult> SendMessageAndWaitForResponse(int endpoint, object message)
{
var source = new TaskCompletionSource<IResult>(TaskCreationOptions.None);
m_TaskSources.Add(endpoint, source);
// Send the message here ...
return source.Task;
}
応答が受信されると、次のように処理されます。
public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
if (m_TaskSources.ContainsKey(endpoint))
{
var source = m_TaskSources[endpoint];
source.SetResult(value);
m_TaskSources.Remove(endpoint);
}
}
ここで、タイムアウトを追加して、呼び出し元のコードが応答を無期限に待機しないようにします。ただし、タスクをタイムアウトする簡単な方法がないため、.NET 4.0 ではやや面倒です。そこで、Rx ならもっと簡単にできるのではないかと考えていました。だから私は次のことを思いついた:
private readonly Dictionary<int, Subject<IResult>> m_SubjectSources
= new Dictionary<int, Subject<IResult>>();
private Task<IResult> SendMessageAndWaitForResponse(int endpoint, object message, TimeSpan timeout)
{
var source = new Subject<IResult>();
m_SubjectSources.Add(endpoint, source);
// Send the message here ...
return source.Timeout(timeout).ToTask();
}
public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
if (m_SubjectSources.ContainsKey(endpoint))
{
var source = m_SubjectSources[endpoint];
source.OnNext(value);
source.OnCompleted();
m_SubjectSources.Remove(endpoint);
}
}
これはすべて問題なく機能しているように見えますが、避けるSubject
べきであるという質問をいくつか見たので、目標を達成するための Rx-y の方法がもっとあるかどうか疑問に思っています。