8

(各フローで)順番に処理する必要がある複数のフローにまたがる依存タスクをキューに入れたい。フローは並行して処理できます。

具体的には、2つのキューが必要であり、各キューのタスクを順番に処理したいとします。望ましい動作を説明するためのサンプル擬似コードを次に示します。

Queue1_WorkItem wi1a=...;

enqueue wi1a;

... time passes ...

Queue1_WorkItem wi1b=...;

enqueue wi1b; // This must be processed after processing of item wi1a is complete

... time passes ...

Queue2_WorkItem wi2a=...;

enqueue wi2a; // This can be processed concurrently with the wi1a/wi1b

... time passes ...

Queue1_WorkItem wi1c=...;

enqueue wi1c; // This must be processed after processing of item wi1b is complete

これは、作業項目間の依存関係を示す矢印の付いた図です。

ここに画像の説明を入力してください

問題は、C#4.0 /.NET4.0を使用してこれをどのように行うかです。現在、キューごとに1つずつ、合計2つのワーカースレッドがあり、キューBlockingCollection<>ごとにを使用しています。代わりに、.NETスレッドプールを活用して、ワーカースレッドにアイテムを同時に(フロー全体で)処理させたいのですが、フロー内でシリアルに処理させたいと思います。言い換えると、たとえば、wi1bが到着したときに、完了を追跡してwi1aを覚えておく必要なしに、wi1bがwi1aの完了に依存していることを示したいと思います。つまり、「queue1の作業項目を送信したいのですが、これは、すでにqueue1に送信した他の項目と連続して処理されますが、他のキューに送信された作業項目と並行して処理される可能性があります」。

この説明が理にかなっていることを願っています。そうでない場合は、コメントで質問してください。それに応じてこの質問を更新します。

読んでくれてありがとう。

アップデート:

これまでの「欠陥のある」解決策を要約すると、私が使用できない回答セクションの解決策と、それらを使用できない理由は次のとおりです。

TPLタスクでは、の先行タスクを指定する必要がありますContinueWith()。新しいタスクを送信するときに、各キューの先行タスクの知識を維持したくありません。

TDF ActionBlocksは有望に見えましたが、ActionBlockに投稿されたアイテムは並行して処理されているように見えます。特定のキューのアイテムをシリアルに処理する必要があります。

アップデート2:

RE:ActionBlocks

オプションを1に設定するMaxDegreeOfParallelismと、単一に送信された作業項目の並列処理が妨げられるように見えますActionBlock。したがって、ActionBlockキューごとに問題を解決できるようですが、唯一の欠点は、MicrosoftからのTDFライブラリのインストールと展開が必要であり、純粋な.NET4.0ソリューションを望んでいたことです。これまでのところ、これは候補として受け入れられた答えです。ただし、キューごとにワーカースレッドに縮退しない純粋な.NET 4.0ソリューション(私がすでに使用している)でこれを行う方法を誰かが理解できない限りです。

4

4 に答える 4

4

キューがたくさんあり、スレッドを拘束したくないとのことですが。キューごとにActionBlockを設定できます。ActionBlockは、必要なもののほとんどを自動化します。作業項目を順番に処理し、作業が保留中の場合にのみタスクを開始します。保留中の作業がない場合、タスク/スレッドはブロックされません。

于 2012-06-27T15:48:10.630 に答える
3

最良の方法は、とを使用することTask Parallel Library (TPL)ですContinuations。継続により、タスクのフローを作成できるだけでなく、例外を処理することもできます。これはTPLの優れた入門書です。しかし、あなたにいくつかのアイデアを与えるために...

を使用してTPLタスクを開始できます

Task task = Task.Factory.StartNew(() => 
{
    // Do some work here...
});

ここで、先行タスクが(エラーまたは正常に)終了したときに2番目のタスクを開始するには、このContinueWithメソッドを使用できます。

Task task1 = Task.Factory.StartNew(() => Console.WriteLine("Antecedant Task"));
Task task2 = task1.ContinueWith(antTask => Console.WriteLine("Continuation..."));

したがって、task1完了するか、失敗するか、キャンセルされるとすぐにtask2「起動」して実行を開始します。task1コードの2行目に到達する前に完了した場合は、task2すぐに実行するようにスケジュールされることに注意してください。2番目のantTaskラムダに渡される引数は、先行タスクへの参照です。より詳細な例については、このリンクを参照してください...

先行タスクからの継続結果を渡すこともできます

Task.Factory.StartNew<int>(() => 1)
    .ContinueWith(antTask => antTask.Result * 4)
    .ContinueWith(antTask => antTask.Result * 4)
    .ContinueWith(antTask =>Console.WriteLine(antTask.Result * 4)); // Prints 64.

ノート。提供されている最初のリンクで例外処理を必ず読んでください。これにより、新規参入者がTPLを誤ってしまう可能性があります。

特に必要なものについて最後に確認する必要があるのは、子タスクです。子タスクは、として作成されるタスクですAttachedToParent。この場合、すべての子タスクが完了するまで継続は実行されません

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew(() =>
{
    Task.Factory.StartNew(() => { SomeMethod() }, atp);
    Task.Factory.StartNew(() => { SomeOtherMethod() }, atp); 
}).ContinueWith( cont => { Console.WriteLine("Finished!") });

これがお役に立てば幸いです。

編集:ConcurrentCollections特にをご覧になりましたかBlockngCollection<T>。だからあなたの場合、あなたは次のようなものを使うかもしれません

public class TaskQueue : IDisposable
{
    BlockingCollection<Action> taskX = new BlockingCollection<Action>();

    public TaskQueue(int taskCount)
    {
        // Create and start new Task for each consumer.
        for (int i = 0; i < taskCount; i++)
            Task.Factory.StartNew(Consumer);  
    }

    public void Dispose() { taskX.CompleteAdding(); }

    public void EnqueueTask (Action action) { taskX.Add(Action); }

    void Consumer()
    {
        // This seq. that we are enumerating will BLOCK when no elements
        // are avalible and will end when CompleteAdding is called.
        foreach (Action action in taskX.GetConsumingEnumerable())
            action(); // Perform your task.
    }
}
于 2012-06-27T15:32:47.187 に答える
1

親タスクをどこかに保存する必要があるという事実を隠しながら、TPLに基づく.NET4.0ソリューションが可能です。例えば:

class QueuePool
{
    private readonly Task[] _queues;

    public QueuePool(int queueCount)
    { _queues = new Task[queueCount]; }

    public void Enqueue(int queueIndex, Action action)
    {
        lock (_queues)
        {
           var parent = _queue[queueIndex];
           if (parent == null)
               _queues[queueIndex] = Task.Factory.StartNew(action);
           else
               _queues[queueIndex] = parent.ContinueWith(_ => action());
        }
    }
}

これは、アイデアを説明するために、すべてのキューに単一のロックを使用しています。ただし、本番コードでは、競合を減らすためにキューごとにロックを使用します。

于 2012-06-28T03:27:49.620 に答える
0

あなたがすでに持っているデザインは良くて機能しているようです。ワーカースレッド(キューごとに1つ)は長時間実行されるため、代わりにタスクを使用するTaskCreationOptions.LongRunning場合は、専用のワーカースレッドを取得するように指定してください。

ただし、ここでThreadPoolを使用する必要は実際にはありません。長時間の作業には多くのメリットはありません。

于 2012-06-27T15:33:56.747 に答える