2

私の .NET 4.0 ライブラリには、ネットワーク経由でデータを送信し、応答を待つコードがあります。呼び出し元のコードをブロックしないために、メソッドはTask<T>、応答を受信したときに完了する を返します。これにより、コードは次のようにメソッドを呼び出すことができます。

// Send the 'message' to the given 'endpoint' and then wait for the response
Task<IResult> task = sender.SendMessageAndWaitForResponse(endpoint, message);
task.ContinueWith(
    t => 
    {
        // Do something with t.Result ...
    });

基になるコードはTaskCompletionSourceを使用して、スレッドをスピンアップせずに応答メッセージを待機できるようにします。

private readonly Dictionary<int, TaskCompletionSource<IResult>> m_TaskSources
    = new Dictionary<int, TaskCompletionSource<IResult>>();

public Task<IResult> SendMessageAndWaitForResponse(int endpoint, object message)
{
    var source = new TaskCompletionSource<IResult>(TaskCreationOptions.None);
    m_TaskSources.Add(endpoint, source);

    // Send the message here ...

    return source.Task;
}

応答が受信されると、次のように処理されます。

public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
    if (m_TaskSources.ContainsKey(endpoint))
    {
        var source = m_TaskSources[endpoint];
        source.SetResult(value);
        m_TaskSources.Remove(endpoint);
    }
}

ここで、タイムアウトを追加して、呼び出し元のコードが応答を無期限に待機しないようにします。ただし、タスクをタイムアウトする簡単な方法がないため、.NET 4.0 ではやや面倒です。そこで、Rx ならもっと簡単にできるのではないかと考えていました。だから私は次のことを思いついた:

private readonly Dictionary<int, Subject<IResult>> m_SubjectSources
    = new Dictionary<int, Subject<IResult>>();

private Task<IResult> SendMessageAndWaitForResponse(int endpoint, object message, TimeSpan timeout)
{
    var source = new Subject<IResult>();
    m_SubjectSources.Add(endpoint, source);

    // Send the message here ...

    return source.Timeout(timeout).ToTask();
}

public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
    if (m_SubjectSources.ContainsKey(endpoint))
    {
        var source = m_SubjectSources[endpoint];
        source.OnNext(value);
        source.OnCompleted();
        m_SubjectSources.Remove(endpoint);
    }
}

これはすべて問題なく機能しているように見えますが、避けるSubjectべきであるという質問をいくつか見たので、目標を達成するための Rx-y の方法がもっとあるかどうか疑問に思っています。

4

1 に答える 1

3

SubjectRx での使用を避けるべきというアドバイスは、しばしば誇張されています。Rx にはイベントのソースが必要Subjectです。

Subject の問題は、一般に、他の方法では結合できる 2 つの Rx クエリ間で使用される場合、または既に明確に定義された変換IObservable<T>(Observable.FromEventXXXまたはObservable.FromAsyncXXXなど) が存在する場合に発生します。

必要に応じて、以下のアプローチでDictionary複数の and を廃止できます。Subjectこれは単一のサブジェクトを使用し、フィルタリングされたクエリをクライアントに返します。

それ自体は「より良い」というわけではありません。これが理にかなっているかどうかは、シナリオの詳細に依存しますが、多くのサブジェクトの生成を節約し、単一のストリームですべての結果を監視するための優れたオプションを提供します. 結果を (たとえばメッセージ キューから) シリアルにディスパッチする場合、これは理にかなっています。

// you only need to synchronize if you are receiving results in parallel
private readonly ISubject<Tuple<int,IResult>, Tuple<int,IResult>> results =
    Subject.Synchronize(new Subject<Tuple<int,IResult>>());

private Task<IResult> SendMessageAndWaitForResponse(
    int endpoint, object message, TimeSpan timeout)
{           
    // your message processing here, I'm just echoing a second later
    Task.Delay(TimeSpan.FromSeconds(1)).ContinueWith(t => {
        CompleteWaitForResponseResponse(endpoint, new Result { Value = message }); 
    });

    return results.Where(r => r.Item1 == endpoint)
                  .Select(r => r.Item2)
                  .Take(1)
                  .Timeout(timeout)
                  .ToTask();
}

public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
    results.OnNext(Tuple.Create(endpoint,value));
}

次のような結果のクラスを定義した場所:

public class Result : IResult
{
    public object Value { get; set; }
}

public interface IResult
{
    object Value { get; set; }
}

編集 - コメントの追加の質問への回答。

  • 単一のサブジェクトを破棄する必要はありません。リークすることはなく、スコープ外になるとガベージ コレクションが行われます。

  • ToTaskはキャンセル トークンを受け入れますが、これは実際にはクライアント側からのキャンセル用です。

  • results.OnError(exception);リモート側が切断された場合は、新しいサブジェクト インスタンスを同時にインスタンス化することで、すべてのクライアントにエラーを送信できます。

何かのようなもの:

private void OnRemoteError(Exception e)
{
    results.OnError(e);        
}

これは、予期された方法ですべてのクライアントに失敗したタスクとして現れます。

以前に送信したサブジェクトにサブスクライブしているクライアントはOnErrorすぐにエラーを返すため、かなりスレッドセーフでもあります。その時点から死んでいます。次に、準備ができたら、次の方法で再初期化できます。

private void OnInitialiseConnection()
{
    // ... your connection logic

    // reinitialise the subject...
    results = Subject.Synchronize(new Subject<Tuple<int,IResult>>());
}

個々のクライアント エラーについては、次のことを考慮することができます。

  • インターフェイスを拡張してIResultエラーをデータとして含める
  • その後、必要に応じて、Rx クエリを で拡張することにより、そのクライアントだけの障害にこれを投影できますSendMessageAndWaitForResponse。たとえば、IResult に Exception および HasError プロパティを追加すると、次のようなことができます。

    return results.Where(r => r.Item1 == endpoint)
                .SelectMany(r => r.Item2.HasError
                    ? Observable.Throw<IResult>(r.Item2.Exception)
                    : Observable.Return(r.Item2))
                .Take(1)
                .Timeout(timeout)
                .ToTask();
    
于 2014-04-03T09:49:02.477 に答える