2

処理する要素を持つコレクションがあり、最大で4つの要素のみを一緒に処理できます。実行時に、すべてのプロセスが一緒に開始され、すべてが待機状態になります。一度に処理される要素は4つだけです。

問題は、すべてのスレッドがリソースが解放されるのを待っているため、処理要素がランダムに選択されることです。最初の要素がコレクションの最後の要素になる可能性があることを意味します。

ただし、要素をコレクションに含めるために処理する必要があります。

どうすればこれを達成できますか?

TPLとC#4.0を使用しています

4

3 に答える 3

5

並列処理では、「順番に」が何を意味するかを定義するという問題が常にあります。100個のアイテムのコレクションがあるとします。それらを「一度に4つずつ」(要求どおりに)処理するということは、次のことを意味します。

  1. 順序が緩い:4つのスレッドを使用し、元のコレクションの順序でタスクを発行します。

    この場合、次を使用できます。

    ParallelOptions po = new ParallelOptions() { MaxDegreeOfParallelism = 4 };
    Parallel.ForEach(list.AsParallel().AsOrdered(), po,
             (item) =>
             {
                 // code
             });
    

    不均衡なタスクの場合、一部のスレッドが重いタスクに遅れをとる可能性があるため、これにより元の順序がすぐに失われますが、タスクは順番に割り当てられます。

  2. 厳密な順序付け:以下のように4つのグループで順番に処理します。

                   0 1 2 3                
                   4 tasks
         _____________________________
                    barrier
    
                   4 5 6 7                
                   4 tasks
         _____________________________
                    barrier
    
                     etc.
    

    この場合、バリアを使用できます。

    Barrier b = new Barrier(4);
    ParallelOptions po = new ParallelOptions() { MaxDegreeOfParallelism = 4 };
    Parallel.ForEach(list.AsParallel().AsOrdered(), po,
        (item) =>
        {
            // code
            b.SignalAndWait();
        });
    

    タスクの数が4の倍数であることを確認する必要がありますが、そうでない場合、最後の反復でバリアが通知されません。

  3. 1つのタスクで4つのアイテムを処理する:元のリストの4つのアイテムをカプセル化するタスクオブジェクトを作成してから、最初の場合と同様に簡単に実行できますParallel.ForEach(つまり、各スレッドは1つのタスクの一部として4つのアイテムを順番に処理します)。これにより、4つのグループでタスクが順番に発行されますが、タスクに時間がかかりすぎると、一部のスレッドが遅れる可能性があります。

于 2012-09-03T06:25:01.913 に答える
0

これが私がこのタスクを達成した方法です

public delegate void ProcessFinished(IParallelProcess process);
public interface IParallelProcess
{
    void Start();
    event ProcessFinished ProcessFinished;
}

public class ParallelProcessBasket : ConcurrentQueue<IParallelProcess>
{
    public void Put(IParallelProcess process)
    {
        base.Enqueue(process);
    }
    public IParallelProcess Get()
    {
        IParallelProcess process = null;
        base.TryDequeue(out process);
        return process;
    }
}
public class ParallelProcessor<T> where T : class
{
    private ParallelProcessBasket basket;
    private readonly int MAX_DEGREE_OF_PARALLELISM;
    private Action<T> action;
    public ParallelProcessor(int degreeOfParallelism, IEnumerable<IParallelProcess> processes, Action<T> action)
    {
        basket = new ParallelProcessBasket();
        this.action = action;
        processes.ToList().ForEach(
            (p) =>
            {
                basket.Enqueue(p);
                p.ProcessFinished += new ProcessFinished(p_ProcessFinished);
            });
        MAX_DEGREE_OF_PARALLELISM = degreeOfParallelism;
    }

    private void p_ProcessFinished(IParallelProcess process)
    {
        if (!basket.IsEmpty)
        {
            T element = basket.Get() as T;
            if (element != null)
            {
                Task.Factory.StartNew(() => action(element));
            }
        }
    }


    public void StartProcessing()
    {
        // take first level of iteration
        for (int cnt = 0; cnt < MAX_DEGREE_OF_PARALLELISM; cnt++)
        {
            if (!basket.IsEmpty)
            {
                T element = basket.Get() as T;
                if (element != null)
                {
                    Task.Factory.StartNew(() => action(element));
                }
            }
        }
    }
}
static void Main(string[] args)    
{
     ParallelProcessor<ParallelTask> pr = new ParallelProcessor<ParallelTask>(Environment.ProcessorCount, collection, (e) => e.Method1());
            pr.StartProcessing();
}

ありがとう..

于 2012-10-02T06:39:48.763 に答える
0

「要素がランダムに選択される」場合、正確に何をしているのかわかりません。ただし、を使用する場合はParalle.ForEach()、効率的にしようとするため、入力シーケンスを何らかの方法で分割します。入力シーケンスがの場合はIList<T>範囲​​分割を使用し、そうでない場合はチャンク分割を使用します(PLINQのチャンク分割と範囲分割を参照)。

アイテムを順番に処理する場合は、カスタムパーティショナーParallel.ForEach()を使用して構成できます。これにより、コレクションがサイズ1のチャンクに分割されます。

ただし、ここでは実際には必要ないためParallel.ForEach()、おそらくより簡単な解決策は、アイテムを1つずつ処理する4つのタスクを作成することです。同期には、を使用できますBlockingCollection。何かのようなもの:

public static class ParallelOrdered
{
    public static void ForEach<T>(IEnumerable<T> collection, Action<T> action, int degreeOfParallelism)
    {
        var blockingCollection = new BlockingCollection<T>();
        foreach (var item in collection)
            blockingCollection.Add(item);
        blockingCollection.CompleteAdding();

        var tasks = new Task[degreeOfParallelism];
        for (int i = 0; i < degreeOfParallelism; i++)
        {
            tasks[i] = Task.Factory.StartNew(
                () =>
                {
                    foreach (var item in blockingCollection.GetConsumingEnumerable())
                        action(item);
                });
        }
        Task.WaitAll(tasks);
    }
}
于 2012-09-03T09:59:58.747 に答える