5

2 つの操作を実行する必要がある文字列のコレクションがあります。

これらの最初のものは、任意の順序で独立して安全に処理できますが (yay)、出力は元の順序で順次処理する必要があります (boo)。

次の Plinq は、ほとんどの方法で私をそこに連れて行きます。

myStrings.AsParallel().AsOrdered()
         .Select( str => Operation1(str) )
         .AsSequential()
         .Select( str => Operation2(str) );
//immagine Operation2() maintains some sort of state and must take the outputs from Operation1 in the original order    

これで大体のところはわかったのですが、問題は、AsOrdered() が原因で、最初にすべての文字列に対して Operation1 が実行され、次に結果要素が元の順序に並べ替えられ、最後に Operation2 の実行が開始されることです。

理想的には、最初の文字列 (つまり、最初に返された文字列ではなく、myStrings[0]) が Operation1 呼び出しによって返されたらすぐに、Operation2 がその作業を開始するようにしたいと考えています。

したがって、これは問題を一般的に解決するための私の試みです:

public static class ParallelHelper
{
    public static IEnumerable<U> SelectAsOrdered<T, U>(this ParallelQuery<T> query, Func<T, U> func)
    {
        var completedTasks = new Dictionary<int, U>();
        var queryWithIndexes = query.Select((x, y) => new { Input = x, Index = y })
                                    .AsParallel()
                                    .Select(t => new { Value = func(t.Input), Index = t.Index })
                                    .WithMergeOptions(ParallelMergeOptions.NotBuffered);

        int i = 0;
        foreach (var task in queryWithIndexes)
        {
            if (i==task.Index)
            {
                Console.WriteLine("immediately yielding task: {0}", i);
                i++;
                yield return task.Value;

                U previouslyCompletedTask;
                while (completedTasks.TryGetValue(i, out previouslyCompletedTask))
                {
                    completedTasks.Remove(i);
                    Console.WriteLine("delayed yielding task: {0}", i);
                    yield return previouslyCompletedTask;
                    i++;
                }
            }
            else
            {
                completedTasks.Add(task.Index, task.Value);
            }
        }
        yield break;
    }
}

次に、元のコード ブロックを次のように書き直すことができます。

myStrings.AsParallel()
         .SelectAsOrdered( str => Operation1(str) )
         .Select(str => Operation2(str));

Operation2 は、myStrings[0] が Operation1 から出るとすぐに開始されます。

私が知りたいのは:

  1. これは、並列化におけるかなり一般的な問題/パターンです。それとももっと簡単な方法がありますか?
  2. 上記の拡張メソッドは機能しているように見えますが、どのように改善できるのでしょうか? コード内に悪い考えのように見えるものはありますか?

ありがとう!

アンディ

興味がある場合に備えて:

  • .WithMergeOptions(ParallelMergeOptions.NotBuffered) への呼び出しがなければ、すべての Operation1 呼び出しが開始されるまで、Operation2 はその作業を開始しません (これは、すべてが完了するまで待機した元のコードよりも優れています)。

  • 実際の問題:
    操作 1 は、大量のテキスト内で法的な引用と参照を検索しています (例: "children act 1989")。
    これらの参照は通常独立していますが、時折、トランスクリプトに「前述の行為のセクション 6」のようなものが含まれることがあります。Operation2 は、Operation1 からのキャプチャに依存して、これらの部分参照を取得します。

4

1 に答える 1

0

速度が必要な場合は、すべてのプロセス(データのロード、データの準備、データの処理、データの集計)を並列化できます。生産者/消費者パターンを使用する方がよいと思います。

ただし、「Linq」を使用する場合、(完全な並列ワークフローを実行する良い方法で) データを並列として生成することはできません (ただし、はい: 準備、処理、および再開)。

一方、「Linq」を「並列(A)+順次(B)」として使用しようとすると、間違っていると思います(できます、はい)、あなたのプロセス(と思います)は

B = f(A)

その場合、B は A を待つ必要があります。

単純に「並列(A/B)」にしないのはなぜですか?

ヘルパー(拡張)を行うことはできますが、一般的には役に立たないと思います。

実際のケースではSemaphore、「記事 ID」への早すぎるアクセスを防ぐために a を使用するだけです。

準備、処理、および再開を並行して行う完全なコード (生成なし) は次のとおりです。

class Text {
    public static Regex rx = new Regex(@" (PREVID|ACTID\=([0-9]+)) ");

    private Text prv; // previous article
    private string ot; // original text
    private int id; // act id on text
    private Semaphore isComputed = new Semaphore(0, 1);

    public int ActID {
        get {
            isComputed.WaitOne();
            int _id = id;
            isComputed.Release();
            return _id;
        }
    }

    public bool ProcessText() {
        var mx = rx.Match(ot);
        var prev = mx.Groups [1].Value == "PREVID";
        if(prev)
            id = prv == null ? 0 : prv.ActID;
        else
            if(!int.TryParse(mx.Groups [2].Value, out id))
                throw new Exception(string.Format(@"Incorrect article id ""{0}""", mx.Groups [0].Value));
        isComputed.Release();
        return !prev;
    }

    public Text(string original_text, Text previous) {
        prv = previous;
        ot = original_text;
    }

}

public static void Main(String [] args) {

    // same random stream (for debugging)
    var rnd = new Random(1);

    var noise = @"These references are usually independent, but occasionally";

    // some noise text
    var bit = new Func<string>(() =>
        noise.Substring(0, rnd.Next(noise.Length)));

    // random article
    var text = new Func<string>(() =>
        string.Format(@"{0}{1}{2}", bit(),
            rnd.Next() % 2 == 0 ? " PREVID "
                                : string.Format(@" ACTID={0} ", rnd.Next()), bit()));

    // random data input
    var data = new List<Text>();
    Text prv = null;
    for(var n = 0; n < 1000000; n++)
        // producer / consumer is better to parallelize load data step
        data.Add(prv = new Text(text(), prv));

    Console.Write("Press key to start...");
    Console.ReadKey();

    // parallel processing
    Console.WriteLine("{0} unique ID's", data.AsParallel().Where(n => n.ProcessText()).Count());

    Console.WriteLine("Process completed.");
}

ご覧のとおり、ProcessTextすべての記事を並行して処理します。前の記事が独自の ID を計算するまで待機するのは、前の記事のみです。

この振る舞いを抽象化する本当の問題 (私が思う) は、アイテムの関係 (あるアイテムが別のアイテムに依存している) です。

プロデューサー/コンシューマー パターンを使用することをお勧めします。

于 2012-10-23T09:42:57.197 に答える