11

ある種のタスクバッファを実装する必要があります。基本的な要件は次のとおりです。

  • 単一のバックグラウンドスレッドでタスクを処理する
  • 複数のスレッドからタスクを受け取る
  • 受信したすべてのタスクを処理します。つまり、停止信号を受信した後、バッファリングされたタスクからバッファが排出されていることを確認します。
  • スレッドごとに受信するタスクの順序を維持する必要があります

以下のようなキューを使って実装することを考えていました。実装に関するフィードバックをいただければ幸いです。そのようなことを実装するための他のより明るいアイデアはありますか?

public class TestBuffer
{
    private readonly object queueLock = new object();
    private Queue<Task> queue = new Queue<Task>();
    private bool running = false;

    public TestBuffer()
    {
    }

    public void start()
    {
        Thread t = new Thread(new ThreadStart(run));
        t.Start();
    }

    private void run()
    {
        running = true;

        bool run = true;
        while(run)
        {
            Task task = null;
            // Lock queue before doing anything
            lock (queueLock)
            {
                // If the queue is currently empty and it is still running
                // we need to wait until we're told something changed
                if (queue.Count == 0 && running)
                {
                    Monitor.Wait(queueLock);
                }

                // Check there is something in the queue
                // Note - there might not be anything in the queue if we were waiting for something to change and the queue was stopped
                if (queue.Count > 0)
                {
                    task = queue.Dequeue();
                }
            }

            // If something was dequeued, handle it
            if (task != null)
            {
                handle(task);
            }

            // Lock the queue again and check whether we need to run again
            // Note - Make sure we drain the queue even if we are told to stop before it is emtpy
            lock (queueLock)
            {
                run = queue.Count > 0 || running;
            }
        }
    }

    public void enqueue(Task toEnqueue)
    {
        lock (queueLock)
        {
            queue.Enqueue(toEnqueue);
            Monitor.PulseAll(queueLock);
        }
    }

    public void stop()
    {
        lock (queueLock)
        {
            running = false;
            Monitor.PulseAll(queueLock);
        }
    }

    public void handle(Task dequeued)
    {
        dequeued.execute();
    }
}
4

5 に答える 5

8

これは、すぐに使用できるBlockingCollectionを使用して実際に処理できます。

1つ以上のプロデューサーと1つ以上のコンシューマーを持つように設計されています。あなたの場合、複数のプロデューサーと1つのコンシューマーがあります。

停止信号を受信したら、その信号ハンドラーを使用します

  • 停止するシグナルプロデューサースレッド
  • BlockingCollectionインスタンスでCompleteAddingを呼び出します

コンシューマースレッドは、キューに入れられたすべてのアイテムが削除されて処理されるまで実行を継続します。その後、BlockingCollectionが完了したという条件が発生します。スレッドがその条件に遭遇すると、スレッドは終了します。

于 2012-09-11T18:05:31.853 に答える
7

実際、FIFOであるConcurrentQueueについて考える必要があります。適切でない場合は、スレッドセーフコレクションでその親戚のいくつかを試してください。これらを使用することにより、いくつかのリスクを回避できます。

于 2015-12-10T07:02:23.313 に答える
1

TPLDataFlowをご覧になることをお勧めします。BufferBlockはあなたが探しているものですが、それだけではありません。

于 2012-09-11T18:11:40.463 に答える
0

これには、.NET3.5でRxを使用できます。RCから出たことがないかもしれませんが、安定しており*、多くの本番システムで使用されていると思います。Subjectが必要ない場合は、.NET 3.5用のプリミティブ(同時コレクションなど)が見つかる可能性があります。これは、4.0まで.NETFrameworkに付属していなかったものを使用できます。

.net 3.5のRx(Reactive Extensions)の代替

*-Nitピッカーのコーナー:範囲外である可能性のある高度な時間ウィンドウを除いて、バッファー(カウントと時間による)、順序付け、およびスケジューラーはすべて安定しています。

于 2013-01-12T04:56:57.883 に答える
-1

スレッドセーフFIFOキューの軽量実装を見てください。これは、スレッドプールを使用する非ブロッキング同期ツールです。ほとんどの場合、独自のスレッドを作成するよりも、ブロッキング同期ツールをロックやミューテックスとして使用するよりも優れています。https://github.com/Gentlee/SerialQueue

使用法:

var queue = new SerialQueue();
var result = await queue.Enqueue(() => /* code to synchronize */);
于 2016-11-05T23:44:58.313 に答える