6

この質問のコードに基づいて、並列/消費者の実装を採用しました

class ParallelConsumer<T> : IDisposable
{
    private readonly int _maxParallel;
    private readonly Action<T> _action;
    private readonly TaskFactory _factory = new TaskFactory();
    private CancellationTokenSource _tokenSource;
    private readonly BlockingCollection<T> _entries = new BlockingCollection<T>();
    private Task _task;

    public ParallelConsumer(int maxParallel, Action<T> action)
    {
        _maxParallel = maxParallel;
        _action = action;
    }

    public void Start()
    {
        try
        {
            _tokenSource = new CancellationTokenSource();
            _task = _factory.StartNew(
                () =>
                {
                    Parallel.ForEach(
                        _entries.GetConsumingEnumerable(),
                        new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token },
                        (item, loopState) =>
                        {
                            Log("Taking" + item);
                            if (!_tokenSource.IsCancellationRequested)
                            {
                                _action(item);
                                Log("Finished" + item);
                            }
                            else
                            {
                                Log("Not Taking" + item);
                                _entries.CompleteAdding();
                                loopState.Stop();
                            }
                        });
                },
                _tokenSource.Token);
        }
        catch (OperationCanceledException oce)
        {
            System.Diagnostics.Debug.WriteLine(oce);
        }
    }

    private void Log(string message)
    {
        Console.WriteLine(message);
    }

    public void Stop()
    {
        Dispose();
    }

    public void Enqueue(T entry)
    {
        Log("Enqueuing" + entry);
        _entries.Add(entry);
    }

    public void Dispose()
    {
        if (_task == null)
        {
            return;
        }

        _tokenSource.Cancel();
        while (!_task.IsCanceled)
        {
        }

        _task.Dispose();
        _tokenSource.Dispose();
        _task = null;
    }
}

そして、ここにテストコードがあります

class Program
{
    static void Main(string[] args)
    {
        TestRepeatedEnqueue(100, 1);
    }

    private static void TestRepeatedEnqueue(int itemCount, int parallelCount)
    {
        bool[] flags = new bool[itemCount];
        var consumer = new ParallelConsumer<int>(parallelCount,
                                              (i) =>
                                              {
                                                  flags[i] = true;
                                              }
            );
        consumer.Start();
        for (int i = 0; i < itemCount; i++)
        {
            consumer.Enqueue(i);
        }
        Thread.Sleep(1000);
        Debug.Assert(flags.All(b => b == true));



    }
}

テストは常に失敗します。テストされた 100 個の項目のうち、常に 93 番目の項目でスタックします。私のコードのどの部分がこの問題を引き起こしたのか、またそれを修正する方法はありますか?

4

2 に答える 2

2

失敗の理由は、ここで説明されているように、次の理由によるものです

Parallel.ForEach と PLINQ の両方で既定で採用されているパーティショニング アルゴリズムは、同期コストを最小限に抑えるためにチャンクを使用します。要素ごとに 1 回ロックを取得するのではなく、ロックを取得し、要素のグループ (チャンク) を取得してから、ロックを解除します。

それを機能させるには、ParallelConsumer<T>クラスにメソッドを追加して、以下のように追加が完了したことを示すことができます。

    public void StopAdding()
    {
        _entries.CompleteAdding();
    }

for loopそして、以下のように、 の後にこのメソッドを呼び出します

        consumer.Start();
        for (int i = 0; i < itemCount; i++)
        {
            consumer.Enqueue(i);
        }
        consumer.StopAdding();

それ以外の場合Parallel.ForEach()は、チャンクを取得して処理を開始するために、しきい値に達するまで待機します。

于 2013-07-24T09:09:54.533 に答える