3

複数のスレッドからアクセスされるクラスのインスタンスがあります。このクラスはこの呼び出しを受け取り、タプルをデータベースに追加します。データベースの制約により、並列スレッドによってデータベースの一貫性が失われる可能性があるため、これをシリアルに行う必要があります。

私は C# での並列処理と同時実行が初めてなので、次のようにしました。

private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();

public void AddDData(string info)
{
    Task t = new Task(() => { InsertDataIntoBase(info); });
    _tasks.Add(t);
}

private void InsertWorker()
{
    Task.Factory.StartNew(() =>
    {
        while (!_tasks.IsCompleted)
        {
            Task t;
            if (_tasks.TryTake(out t))
            {
                t.Start();
                t.Wait();
            }
        }
    });
}

これAddDDataは、複数のスレッドによって呼び出されるスレッドでInsertDataIntoBaseあり、数ミリ秒かかる非常に単純な挿入です。

問題は、なんらかの理由で、私の知識不足で理解できないことです。タスクが 2 回呼び出されることがあります。常に次のようになります。

T1 T2 T3 T1 ← PK エラー。T4 ...

私は.Take()完全に間違っていることを理解しましたか、何かが欠けているのでしょうか、それともプロデューサー/コンシューマーの実装が本当に悪いのでしょうか?

よろしく、 ラファエル

アップデート:

示唆されたように、私はこのアーキテクチャで簡単なサンドボックス テストの実装を作成しました。私が疑っていたように、前のタスクが終了する前にタスクが起動されないことを保証するものではありません。

ここに画像の説明を入力

問題は残ります: タスクを適切にキューに入れ、順番に起動するにはどうすればよいでしょうか?

更新 2:

コードを簡略化しました:

private BlockingCollection<Data> _tasks = new BlockingCollection<Data>();

public void AddDData(Data info)
{
    _tasks.Add(info);
}

private void InsertWorker()
{
    Task.Factory.StartNew(() =>
    {
        while (!_tasks.IsCompleted)
        {
            Data info;
            if (_tasks.TryTake(out info))
            {
                InsertIntoDB(info);
            }
        }
    });
}

同期された InsertIntoDB 呼び出しに依存しているため (ループ内にあるため)、Tasks を削除したことに注意してください。列。しかし、何度やっても、同じオブジェクトが 2 回使用されることがあります。

4

3 に答える 3

1

私はこれがうまくいくと思います:

    private static BlockingCollection<string> _itemsToProcess = new BlockingCollection<string>();

    static void Main(string[] args)
    {
        InsertWorker();
        GenerateItems(10, 1000);
        _itemsToProcess.CompleteAdding();
    }

    private static void InsertWorker()
    {
        Task.Factory.StartNew(() =>
        {
            while (!_itemsToProcess.IsCompleted)
            {
                string t;
                if (_itemsToProcess.TryTake(out t))
                {
                    // Do whatever needs doing here
                    // Order should be guaranteed since BlockingCollection 
                    // uses a ConcurrentQueue as a backing store by default.
                    // http://msdn.microsoft.com/en-us/library/dd287184.aspx#remarksToggle
                    Console.WriteLine(t);
                }
            }
        });
    }

    private static void GenerateItems(int count, int maxDelayInMs)
    {
        Random r = new Random();
        string[] items = new string[count];

        for (int i = 0; i < count; i++)
        {
            items[i] = i.ToString();
        }

        // Simulate many threads adding items to the collection
        items
            .AsParallel()
            .WithDegreeOfParallelism(4)
            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
            .Select((x) =>
            {
                Thread.Sleep(r.Next(maxDelayInMs));
                _itemsToProcess.Add(x);
                return x;
            }).ToList();
    }

これは、コンシューマーがシングル スレッドであることを意味しますが、複数のプロデューサー スレッドを許可します。

于 2013-09-11T20:29:34.427 に答える
0

コメントより

「データは文字列ではないため、ここに示すコードを単純化しました」

infoAddDData に渡されるパラメーターは、変更可能な参照型であると想定しています。infoその参照は Task lambda でキャプチャされるため、呼び出し元が複数の呼び出しに同じインスタンスを使用していないことを確認してください。

于 2013-09-11T19:52:40.273 に答える
0

提供したトレースに基づいて、唯一の論理的な可能性は、InsertWorker2 回 (またはそれ以上) 呼び出したことです。したがって、2 つのバックグラウンド スレッドがアイテムがコレクションに表示されるのを待っており、場合によっては両方ともアイテムを取得して実行を開始することがあります。

于 2013-09-12T06:23:47.593 に答える