1

私は最近、一連のスレッドを生成し、ブロックして 1 つの応答 (最初に到着した応答) を待機し、残りのスレッドをキャンセルしてからブロックを解除できると便利なケースに遭遇しました。

たとえば、シード値を取る検索関数があるとします。検索関数は自明に並列化できると規定しましょう。さらに、検索空間には多くの潜在的な解が含まれており、シード値によっては関数が無期限に検索しますが、少なくとも 1 つのシード値が適切な時間内に解を生成します。

次のように、完全に素朴に、この検索を並行して実行できれば素晴らしいと思います。

let seeds = [|0..100|]
Array.Parallel.map(fun seed -> Search(seed)) seeds

残念ながら、Array.Parallel.mapすべてのスレッドが完了するまでブロックされます。残念。検索機能でいつでもタイムアウトを設定できますが、その場合、最も長く実行されているスレッドが終了するまで待つことはほぼ確実です。さらに、問題によっては、タイムアウトが十分に長くない場合があります。

要するに、select()任意の関数に対してのみ、UNIX ソケット呼び出しのようなものが欲しいのです。これは可能ですか?上記のようにかなりデータ並列の抽象化である必要はなく、F# コードである必要もありません。ネイティブ ライブラリを使用して、P/Invoke 経由で呼び出すこともできれば幸いです。

4

4 に答える 4

3
let rnd = System.Random()

let search seed = async {
    let t = rnd.Next(10000)
    //printfn "seed: %d  ms: %d" seed t
    do! Async.Sleep t
    return sprintf "seed %d finish" seed
}

let processResult result = async {
    //Todo:
    printfn "%s" result
}

let cts = new System.Threading.CancellationTokenSource()
let ignoreFun _ = () //if you don't want handle

let tasks = 
    [0..10]
    |> List.map (fun i ->
        async {
            let! result = search i
            do! processResult result
            cts.Cancel()
        }
    )

Async.StartWithContinuations(Async.Parallel tasks, ignoreFun, ignoreFun, ignoreFun, cts.Token)
于 2013-09-13T04:41:29.620 に答える
0

これは私にとってはうまくいくようでした

namespace CancellParallelLoops
{
    class Program
    {
        static void Main(string[] args)
        {
            int[] nums = Enumerable.Range(0, 10000000).ToArray();
            CancellationTokenSource cts = new CancellationTokenSource();

            // Use ParallelOptions instance to store the CancellationToken
            ParallelOptions po = new ParallelOptions();
            po.CancellationToken = cts.Token;
            po.MaxDegreeOfParallelism = System.Environment.ProcessorCount;
            Console.WriteLine("Press any key to start. Press 'c' to cancel.");
            Console.ReadKey();

            // Run a task so that we can cancel from another thread.
            Task.Factory.StartNew(() =>
            {
                if (Console.ReadKey().KeyChar == 'c')
                    cts.Cancel();
                Console.WriteLine("press any key to exit");
            });

            try
            {
                Parallel.ForEach(nums, po, (num) =>
                {
                    double d = Math.Sqrt(num);
                    Console.WriteLine("{0} on {1}", d, Thread.CurrentThread.ManagedThreadId);
                    if (num == 1000) cts.Cancel();
                    po.CancellationToken.ThrowIfCancellationRequested();
                });
            }
            catch (OperationCanceledException e)
            {
                Console.WriteLine(e.Message);
            }

            Console.ReadKey();
        }
    }
}
于 2013-09-12T19:48:18.803 に答える
0

イベントオブジェクトを使用してすべてのスレッドを同期してみてください。イベントを設定するソリューションが見つかったら、他のすべてのスレッドはイベントの状態を定期的にチェックし、すでに設定されている場合は実行を停止する必要があります。

詳細については、こちらをご覧ください。

于 2013-09-12T19:37:28.307 に答える