13

いくつかの異なるサーバーを並行してWebサービス呼び出しを行うコードを作成しようとしているので、TPLを使用するのは当然の選択のようです。

私のWebサービス呼び出しの1つだけが私が望む結果を返すことはあり、他のすべては返しません。私は、条件に一致するTask.WaitAny最初のものが戻ったときにのみブロックを解除することを効果的に行う方法を模索しています。Task

試してみましWaitAnyたが、フィルターをどこに置くかわかりませんでした。私はここまで来ました:

public void SearchServers()
{
    var servers = new[] {"server1", "server2", "server3", "server4"};
    var tasks = servers
                 .Select(s => Task<bool>.Factory.StartNew(server => CallServer((string)server), s))
                 .ToArray();

    Task.WaitAny(tasks); //how do I say "WaitAny where the result is true"?

    //Omitted: cancel any outstanding tasks since the correct server has been found
}

private bool CallServer(string server)
{
    //... make the call to the server and return the result ...
}

編集:上記の混乱がある場合に備えて、簡単に説明します。私は次のことをしようとしています:

  1. サーバーごとに、を開始しTaskて確認します
  2. いずれか、サーバーがtrueを返すまで待ち​​ます(最大1台のサーバーのみがtrueを返します)
  3. または、すべてのサーバーがfalseを返すまで、つまり一致するものがなくなるまで待ちます。
4

4 に答える 4

10

私が考えることができる最善の方法は、ContinueWithfor eachを指定しTask、結果を確認trueし、他のタスクをキャンセルする場合です。タスクをキャンセルするには、CancellationTokenを使用できます。

var tasks = servers
    .Select(s => Task.Run(...)
        .ContinueWith(t =>
            if (t.Result) {
                // cancel other threads
            }
        )
    ).ToArray();

更新: 別の解決策はWaitAny、適切なタスクが完了するまで行うことです (ただし、リストから完了したタスクを削除し、残りのタスクから新しい配列を作成するなど、いくつかの欠点があります)。

List<Task<bool>> tasks = servers.Select(s => Task<bool>.Factory.StartNew(server => CallServer((string)server), s)).ToList();

bool result;
do {
    int idx = Task.WaitAny(tasks.ToArray());
    result = tasks[idx].Result;
    tasks.RemoveAt(idx);
} while (!result && tasks.Count > 0);

// cancel other tasks

更新 2: 最近では Rx でそれを行います:

[Fact]
public async Task AwaitFirst()
{
    var servers = new[] { "server1", "server2", "server3", "server4" };
    var server = await servers
        .Select(s => Observable
            .FromAsync(ct => CallServer(s, ct))
            .Where(p => p)
            .Select(_ => s)
        )
        .Merge()
        .FirstAsync();
    output.WriteLine($"Got result from {server}");
}

private async Task<bool> CallServer(string server, CancellationToken ct)
{
    try
    {
        if (server == "server1")
        {
            await Task.Delay(TimeSpan.FromSeconds(1), ct);
            output.WriteLine($"{server} finished");
            return false;
        }
        if (server == "server2")
        {
            await Task.Delay(TimeSpan.FromSeconds(2), ct);
            output.WriteLine($"{server} finished");
            return false;
        }
        if (server == "server3")
        {
            await Task.Delay(TimeSpan.FromSeconds(3), ct);
            output.WriteLine($"{server} finished");
            return true;
        }
        if (server == "server4")
        {
            await Task.Delay(TimeSpan.FromSeconds(4), ct);
            output.WriteLine($"{server} finished");
            return true;
        }
    }
    catch(OperationCanceledException)
    {
        output.WriteLine($"{server} Cancelled");
        throw;
    }

    throw new ArgumentOutOfRangeException(nameof(server));
}

私のマシンではテストに 3.32 秒かかり (つまり、4 番目のサーバーを待機しなかったことを意味します)、次の出力が得られました。

server1 finished
server2 finished
server3 finished
server4 Cancelled
Got result from server3
于 2013-02-06T10:38:49.383 に答える
4

完了するとタスクを返すOrderByCompletion()AsyncEx ライブラリから使用できます。コードは次のようになります。

var tasks = servers
    .Select(s => Task.Factory.StartNew(server => CallServer((string)server), s))
    .OrderByCompletion();

foreach (var task in tasks)
{
    if (task.Result)
    {
        Console.WriteLine("found");
        break;
    }
    Console.WriteLine("not found yet");
}

// cancel any outstanding tasks since the correct server has been found
于 2013-02-06T11:11:53.450 に答える
1

Interlocked.CompareExchange を使用すると、それが実行されます。1 つのタスクのみが serverReturedData に書き込むことができます。

    public void SearchServers()
        {
            ResultClass serverReturnedData = null;
            var servers = new[] {"server1", "server2", "server3", "server4"};
            var tasks = servers.Select(s => Task<bool>.Factory.StartNew(server => 
            {
               var result = CallServer((string)server), s);
               Interlocked.CompareExchange(ref serverReturnedData, result, null);

            }).ToArray();

            Task.WaitAny(tasks); //how do I say "WaitAny where the result is true"?
        //
        // use serverReturnedData as you want.
        // 
        }

編集: Jasd が言ったように、変数 serverReturnedData が有効な値を持つ前に上記のコードが返される可能性があり (サーバーが null 値を返す場合、これが発生する可能性があります)、結果をカスタム オブジェクトに確実にラップできるようにします。

于 2013-02-06T10:41:42.190 に答える
1

svick's answer に基づく一般的な解決策は次のとおりです。

public static async Task<T> GetFirstResult<T>(
this IEnumerable<Func<CancellationToken, Task<T>>> taskFactories, 
Action<Exception> exceptionHandler,
Predicate<T> predicate)
{
    T ret = default(T);
    var cts = new CancellationTokenSource();
    var proxified = taskFactories.Select(tf => tf(cts.Token)).ProxifyByCompletion();
    int i;
    for (i = 0; i < proxified.Length; i++)
    {
        try
        {
            ret = await proxified[i].ConfigureAwait(false);
        }
        catch (Exception e)
        {
            exceptionHandler(e);
            continue;
        }
        if (predicate(ret))
        {
            break;
        }
    }

    if (i == proxified.Length)
    {
        throw new InvalidOperationException("No task returned the expected value");
    }
    cts.Cancel(); //we have our value, so we can cancel the rest of the tasks
    for (int j = i+1; j < proxified.Length; j++)
    {
        //observe remaining tasks to prevent process crash 
        proxified[j].ContinueWith(
         t => exceptionHandler(t.Exception), TaskContinuationOptions.OnlyOnFaulted)
                   .Forget();
    }
    return ret;
}

ProxifyByCompletionは次のように実装されています。

public static Task<T>[] ProxifyByCompletion<T>(this IEnumerable<Task<T>> tasks)
{
    var inputTasks = tasks.ToArray();
    var buckets = new TaskCompletionSource<T>[inputTasks.Length];
    var results = new Task<T>[inputTasks.Length];
    for (int i = 0; i < buckets.Length; i++)
    {
        buckets[i] = new TaskCompletionSource<T>();
        results[i] = buckets[i].Task;
    }
    int nextTaskIndex = -1;
    foreach (var inputTask in inputTasks)
    {
        inputTask.ContinueWith(completed =>
        {
            var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
            if (completed.IsFaulted)
            {
                Trace.Assert(completed.Exception != null);
                bucket.TrySetException(completed.Exception.InnerExceptions);
            }
            else if (completed.IsCanceled)
            {
                bucket.TrySetCanceled();
            }
            else
            {
                bucket.TrySetResult(completed.Result);
            }
        }, CancellationToken.None, 
           TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
    }
    return results;
}

CS4014Forgetを抑制するための空のメソッドです。

public static void Forget(this Task task) //suppress CS4014
{
}
于 2014-12-27T18:08:17.780 に答える