5

並列または順番に実行したい 2 つの .net Task オブジェクトがあります。どちらの場合でも、スレッドをブロックして待機させたくありません。結局のところ、Reactive Extensionsによって、並列のストーリーが単純に美しくなります。しかし、タスクを順番に並べようとすると、コードは機能しますが、ぎこちなく感じます。

逐次バージョンをより簡潔にする方法や、並列バージョンと同じくらい簡単にコーディングする方法を誰かが示すことができるかどうか知りたいです。この質問への回答にリアクティブ拡張機能を使用する必要はありません。

参考までに、並列処理と順次処理の両方に対する私の 2 つのソリューションを次に示します。

並列処理版

これは純粋な喜びです。

    public Task<string> DoWorkInParallel()
    {
        var result = new TaskCompletionSource<string>();

        Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
        Task<bool> BravoTask = Task.Factory.StartNew(() => true);

        //Prepare for Rx, and set filters to allow 'Zip' to terminate early
        //in some cases.
        IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);
        IObservable<bool> AsyncBravo = BravoTask.ToObservable().TakeWhile(y => y);

        Observable
            .Zip(
                AsyncAlpha,
                AsyncBravo,
                (x, y) => y.ToString() + x.ToString())
            .Timeout(TimeSpan.FromMilliseconds(200)).Subscribe(
                (x) => { result.TrySetResult(x); },
                (x) => { result.TrySetException(x.GetBaseException()); },
                () => { result.TrySetResult("Nothing"); });

        return result.Task;
    }

順次/パイプライン処理バージョン

これは機能しますが、不器用です:

    public Task<string> DoWorkInSequence()
    {
        var result = new TaskCompletionSource<string>();

        Task<int> AlphaTask = Task.Factory.StartNew(() => 4);

        AlphaTask.ContinueWith(x =>
        {
            if (x.IsFaulted)
            {
                result.TrySetException(x.Exception.GetBaseException());
            }
            else
            {
                if (x.Result != 5)
                {
                    Task<bool> BravoTask = Task.Factory.StartNew(() => true);
                    BravoTask.ContinueWith(y =>
                    {
                        if (y.IsFaulted)
                        {
                            result.TrySetException(y.Exception.GetBaseException());
                        }
                        else
                        {
                            if (y.Result)
                            {
                                result.TrySetResult(x.Result.ToString() + y.Result.ToString());
                            }
                            else
                            {
                                result.TrySetResult("Nothing");
                            }
                        }
                    });
                }
                else
                {
                    result.TrySetResult("Nothing");
                }
            }
        }
        );

        return result.Task;
    }

上記のシーケンシャル コードでは、ごちゃごちゃになっていて、並列バージョンに合わせて タイムアウト機能を追加していません。

要件(8/6更新)

回答者は、次の点に注意してください。

  1. シーケンシャル シナリオでは、最初のタスクの出力が 2 番目のタスクの入力に供給される配置を許可する必要があります。上記の私のサンプルの「ぎこちない」コードは、それを実現するために簡単に配置できたはずです。

  2. 私は.net 4.5の回答に興味がありますが、.net 4.0の回答は私にとって同等以上に重要です。

  3. タスク「アルファ」と「ブラボー」には、完了までに合わせて 200 ミリ秒の時間制限があります。それぞれ200ミリ秒ありません。これはシーケンシャルの場合にも当てはまります。

  4. いずれかのタスクが無効な結果を返す場合、SourceCompletionTask は、両方のタスクが完了する前に早期に完了する必要があります。サンプル コードの明示的なテストで示されているように、無効な結果は [AlphaTask:5] または [BravoTask:false] のいずれかです。
    更新 8/8: 明確化- シーケンシャルの場合、AlphaTask が成功しない場合、またはタイムアウトが既に発生している場合、BravoTask はまったく実行されません。

  5. AlphaTask と BravoTask の両方がブロックできないと仮定します。それは問題ではありませんが、私の現実のシナリオでは、実際には非同期の WCF サービス呼び出しです。

シーケンシャル バージョンをクリーンアップするために利用できた Rx の側面があるのか​​もしれません。しかし、タスクプログラミングだけでも、私が想像するより良い話があるはずです。見てみましょう。

正誤表 両方のコード サンプルで、 TaskCompletionSourceを返すべきではなかったポスターの回答が非常に正しかったため、戻り値の型を Task に変更しました。

4

4 に答える 4

4

async/await を使用できる場合、Brandon は適切な回答を提供しています。まだ VS2010 を使用している場合、順次バージョンをクリーンアップするために私が最初に行うことは、ThenStephen Toub がブログ投稿で説明したメソッドのような拡張メソッドを取得することです。Task.FromResult.NET 4.5 を使用していない場合は、メソッドも実装します。それらを使用すると、次のものが得られます。

public Task<string> DoWorkInSequence()
{
    return Task.FromResult(4)
           .Then(x => 
                 { if (x != 5)
                   {
                       return Task.FromResult(true)
                              .Then(y => 
                                    { if (y)
                                      {
                                          return Task.FromResult(x.ToString() + y.ToString());
                                      }
                                      else
                                      {
                                          return Task.FromResult("Nothing");
                                      }
                                    });
                    }
                    else
                    {
                        return Task.FromResult("Nothing");
                    }
                 });
}

また、通常は、TaskCompletionSource (TaskCompletionSource を呼び出すことで取得できます) の代わりに Task を返す必要があります。これは.Task、呼び出し元に返すタスクの結果を設定してほしくないためです。

Brandon の回答は、タイムアウト機能を実装する良い方法も提供します (async/await キーワードの欠如を調整します)。

EDIT 矢印コードを減らすために、より多くのLINQメソッドを実装できます。SelectMany の実装は、以前にリンクされたブログ投稿で提供されています。LINQ に必要な他のメソッドは、Select と Where です。Then と SelectMany を実行すれば、これらは非常に簡単なはずですが、次のようになります。

public static Task<T> Where<T>(this Task<T> task, Func<T, bool> predicate)
{
    if (task == null) throw new ArgumentNullException("task");
    if (predicate == null) throw new ArgumentNullException("predicate");

    var tcs = new TaskCompletionSource<T>();
    task.ContinueWith((completed) =>
        {
            if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
            else if (completed.IsCanceled) tcs.TrySetCanceled();
            else
            {
                try
                {
                    if (predicate(completed.Result))
                        tcs.TrySetResult(completed.Result);
                    else
                        tcs.TrySetCanceled();
                }
                catch (Exception ex)
                {
                    tcs.TrySetException(ex);
                }
            }
        });
    return tcs.Task;
}

public static Task<TResult> Select<T, TResult>(this Task<T> task, Func<T, TResult> selector)
{
    if (task == null) throw new ArgumentNullException("task");
    if (selector == null) throw new ArgumentNullException("selector");

    var tcs = new TaskCompletionSource<TResult>();
    task.ContinueWith((completed) =>
    {
        if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
        else if (completed.IsCanceled) tcs.TrySetCanceled();
        else
        {
            try
            {
                tcs.TrySetResult(selector(completed.Result));
            }
            catch (Exception ex)
            {
                tcs.TrySetException(ex);
            }
        }
    });
    return tcs.Task;
}

その後、1 つの最後の非 LINQ 拡張メソッドを使用すると、キャンセル時に既定値を返すことができます。

public static Task<T> IfCanceled<T>(this Task<T> task, T defaultValue)
{
    if (task == null) throw new ArgumentNullException("task");

    var tcs = new TaskCompletionSource<T>();
    task.ContinueWith((completed) =>
    {
        if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
        else if (completed.IsCanceled) tcs.TrySetResult(defaultValue);
        else tcs.TrySetResult(completed.Result);
    });
    return tcs.Task;
}

そして新しく改善された DoWork (sans timeout):

public static Task<string> DoWorkInSequence()
{
    return (from x in Task_FromResult(5)
            where x != 5
            from y in Task_FromResult(true)
            where y
            select x.ToString() + y.ToString()
           ).IfCanceled("Nothing");
}

Brandon's answer の Timeout メソッド (必要に応じて async/await なしで一度書き直されます) は、チェーンの最後で全体的なタイムアウトのために、および/またはチェーンの各ステップの後にスタックすることができます。全体のタイムアウトに達しました。チェーン中断のもう 1 つの可能性は、すべての個々のステップでキャンセル トークンを取得し、Timeout メソッドを変更して CancellationTokenSource を取得し、タイムアウトが発生した場合にそれをキャンセルし、タイムアウト例外をスローすることです。

編集(ブレント・アリアス)

あなたが提示したものから素晴らしいアイデアを取り入れて、私は自分の視点からの最終的な答えだと思うものを考案しました. ParallelExtensionsExtrasの nuget パッケージにある .net 4.0 拡張メソッドに基づいています。以下のサンプルでは、​​3 番目のタスクを追加して、私が述べた要件を考慮して、シーケンシャル タスクのプログラミングの "感覚" を説明するのに役立てています。

public Task<string> DoWorkInSequence()
{
    var cts = new CancellationTokenSource();

    Task timer = Task.Factory.StartNewDelayed(200, () => { cts.Cancel(); });

    Task<int> AlphaTask = Task.Factory
        .StartNew(() => 4 )
        .Where(x => x != 5 && !cts.IsCancellationRequested);

    Task<bool> BravoTask = AlphaTask
        .Then(x => true)
        .Where(x => x && !cts.IsCancellationRequested);

    Task<int> DeltaTask = BravoTask
        .Then(x => 7)
        .Where(x => x != 8);

    Task<string> final = Task.Factory
        .WhenAny(DeltaTask, timer)
        .ContinueWith(x => !DeltaTask.IsCanceled && DeltaTask.Status == TaskStatus.RanToCompletion
            ? AlphaTask.Result.ToString() + BravoTask.Result.ToString() + DeltaTask.Result.ToString(): "Nothing");

    //This is here just for experimentation.  Placing it at different points
    //above will have varying effects on what tasks were cancelled at a given point in time.
    cts.Cancel();

    return final;
}

この議論と共同作業で私が行ったいくつかの重要な観察事項があります。

  • "Then" 拡張機能の使用は、些細な場合には便利ですが、適用範囲が限定されていることは注目に値します。より複雑なケースでは、たとえば に置き換える必要があり.ContinueWith(x => true, cts.Token, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default)ます。前述のシナリオで「Then」を「ContinueWith」に置き換える場合、OnlyOnRanToCompletionオプションを追加することが重要です。
  • タイムアウト拡張機能を使用しても、最終的には私のシナリオでは機能しません。これは、シーケンス内のすべての先行タスク インスタンスをキャンセルするのではなく、すぐにアタッチされているタスクのみをキャンセルするためです。これが、タクティックに切り替えて、各節StartNewDelayed(...)にエクスプレス キャンセル チェックを追加した理由です。Where
  • ParallelExtensionsExtras ライブラリには、あなたが使用したLINQ to Tasksが定義されていますが、タスクで LINQ 風の外観を避けるのが最善であると結論付けました。これは、LINQ を使用するタスクが非常に難解であるためです。平均的な開発者を混乱させる可能性があります。彼らに非同期コーディングを理解させるのは非常に困難です。LINQ to Tasks の著者でさえ、「この LINQ 実装が実際にどれほど役立つかは議論の余地がありますが、少なくとも、興味深い思考演習を提供します」と述べています。はい、同意しました。興味深い思考演習です。もちろん、少なくとも "Where" LINQ to Tasks メソッドは、上記のソリューションで重要な役割を果たしたことを認めなければなりません。
于 2013-08-06T03:53:36.280 に答える
1
public Task<string> DoWorkInSequence()
{
    Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
    Func<int> BravoFunc = x => 2 * x;

    //Prepare for Rx, and set filters to allow 'Zip' to terminate early
    //in some cases.
    IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);

    return AsyncAlpha
        .Do(x => Console.WriteLine(x))  //This is how you "Do WORK in sequence"
        .Select(BravoFunc)              //This is how you map results from Alpha
                                        //via a second method.
        .Timeout(TimeSpan.FromMilliseconds(200)).Subscribe(
            (x) => { result.TrySetResult(x); },
            (x) => { result.TrySetException(x.GetBaseException()); },
            () => { result.TrySetResult("Nothing"); }).ToTask();
}

ただし、最終的には、タスクが必要な場合は TPL ですべてを行うか、使用するObservable.ToTask(this IObservable<T> observable)のではなく使用しますTaskCompletionSource

于 2013-08-06T03:09:51.740 に答える
1

Aron 持っていました それ ほぼ スポット オン

public Task<string> DoWorkSequentially()
{
   Task<int> AlphaTask = Task.Run(() => 4);    //Some work;
   Task<bool> BravoTask = Task.Run(() => true);//Some other work;

   //Prepare for Rx, and set filters to allow 'Zip' to terminate early
   //in some cases.
   IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);
   IObservable<bool> AsyncBravo = BravoTask.ToObservable().TakeWhile(y => y);

    return (from alpha in AsyncAlpha
           from bravo in AsyncBravo
           select bravo.ToString() + alpha.ToString())
       .Timeout(TimeSpan.FromMilliseconds(200))
       .Concat(Observable.Return("Nothing"))   //Return Nothing if no result
       .Take(1)
       .ToTask();
}

ここでは、BravoFunc後ろを に置きましたBravoTaskTaskCompletionSource私は(アロンがしたように)を削除しました。最後に、ToTask()演算子を使用して Rx 継続を a に戻しTask<string>ます。

ご了承ください

    from alpha in AsyncAlpha
    from bravo in AsyncBravo
    select bravo.ToString() + alpha.ToString()

のように書くこともできる.

    AsyncAlpha.SelectMany(a=>AsyncBravo.Select(b=> b.ToString() + a.ToString()))

SelectMany 演算子は、これらの種類の継続に非常に便利です。クエリ内包表記構文では、最後の select 句でbravoandにアクセスできるため、さらに便利です。alpha

ご覧のように、継続が多くなると、これは非常に役立ちます。たとえば、3 つまたは 4 つの継続が必要な例を考えてみましょう

    from a in Alpha
    from b in Bravo
    from c in Charlie
    from d in Delta
    select a+b+c+d

これには、実際のアプリケーションも含まれています。これはよくあるパターンだと思います。いくつかの例が含まれます。サーバーが接続されるのを待ってから、セッション トークンを取得してサービス クライアントに渡します。

    from isConnected in _server.ConnectionState.Where(c=>c)
    from session in _server.GetSession()
    from customer in _customerServiceClient.GetCustomers(session)
    select customer;

またはおそらく、認証が必要なソーシャル メディア フィードで、連絡先を見つけ、メールのリストを取得し、これらのメールの最初の 20 ヘッダーをプルダウンします。

    from accessToken in _oauth.Authenticate()
    from contact in _contactServiceClient.GetContact(emailAddress, accessToken)
    from imapMessageId in _mailServiceClient.Search(contact).Take(20)
    from email in _mailServiceClient.GetEmailHeaders(imapMessageId)
    select email;
于 2013-08-08T09:05:14.770 に答える
1

まず、私は を返しませんでしたTaskCompletionSource。これは目的を達成するための手段です...公開 API から隠されるべきメソッドの実装の詳細。メソッドは代わりに a を返す必要がありますTask(単に を返す必要がありresult.Taskます)。

いずれにせよ、単にタスクを処理している場合は、Rx を使用せずに TPL を使用する必要があります。実際にタスクを他の rx コードと統合する必要がある場合にのみ、Rx を使用してください。DoWorkInParallelRx のものを混ぜない場合でも、はるかに簡単にすることができます。Rx は、複雑な Task をうまく処理します。しかし、あなたが説明しているシナリオは比較的単純で、TPL で簡単に解決できます。

TPL で並列バージョンと順次バージョンの両方を実行する方法は次のとおりです。

/// <summary>Extension methods for timing out tasks</summary>
public static class TaskExtensions
{
    /// <summary> throws an error if task does not complete before the timer.</summary>
    public static async Task Timeout(this Task t, Task timer)
    {
        var any = await Task.WhenAny(t, timer);
        if (any != t)
        {
           throw new TimeoutException("task timed out");
        }
    }

    /// <summary> throws an error if task does not complete before the timer.</summary>
    public static async Task<T> Timeout<T>(this Task<T> t, Task timer)
    {
        await Timeout((Task)t, timer);
        return t.Result;
    }

    /// <summary> throws an error if task does not complete in time.</summary>
    public static Task Timeout(this Task t, TimeSpan delay)
    {
        return t.IsCompleted ? t : Timeout(t, Task.Delay(delay));
    }

    /// <summary> throws an error if task does not complete in time.</summary>
    public static Task<T> Timeout<T>(this Task<T> t, TimeSpan delay)
    {
        return Timeout((Task)t, delay);
    }
}

// .. elsewhere ..
public async Task<string> DoWorkInParallel()
{
    var timer = Task.Delay(TimeSpan.FromMilliseconds(200));
    var alphaTask = Task.Run(() => 4);
    var betaTask = Task.Run(() => true);

    // wait for one of the tasks to complete
    var t = await Task.WhenAny(alphaTask, betaTask).Timeout(timer);

    // exit early if the task produced an invalid result
    if ((t == alphaTask && alphaTask.Result != 5) ||
        (t == betaTask && !betaTask.Result)) return "Nothing";

    // wait for the other task to complete
    // could also just write: await Task.WhenAll(alphaTask, betaTask).Timeout(timer);
    await ((t == alphaTask) ? (Task)betaTask : (Task)alphaTask).Timeout(timer);

    // unfortunately need to repeat the validation logic here.
    // this logic could be moved to a helper method that is just called in both places.
    var alpha = alphaTask.Result;
    var beta = betaTask.Result;
    return (alpha != 5 && beta) ? (alpha.ToString() + beta.ToString()) : "Nothing";
}

public async Task<string> DoWorkInSequence()
{
    var timer = Task.Delay(TimeSpan.FromMilliseconds(200));
    var alpha = await Task.Run(() => 4).Timeout(timer);
    if (alpha != 5)
    {
        var beta = await Task.Run(() => true).Timeout(timer);
        if (beta)
        {
            return alpha.ToString() + beta.ToString();
        }
    }

    return "Nothing";
}

.Net 4.0 で作業を行う必要がある場合は、VS2012 コンパイラを使用して .Net 4.0 をターゲットにし、 async/await を引き続き使用できる Microsoft.Bcl.Async nuget パッケージを使用できます。この SO の質問を参照してください: Using async-await on .net 4

編集:タスクが無効な値を生成する場合、並列バージョンと順次バージョンの両方で早期に終了するようにコードを変更し、タイムアウトをタスクごとではなく組み合わせて変更しました。シーケンシャルの場合ですが、このタイマーは2 つのタスクの時間もカウントします。

于 2013-08-06T03:44:13.003 に答える