16

最近、.NET 4.0 の TPL に関するポッドキャストをたくさん耳にしました。それらのほとんどは、作業が GUI スレッドに干渉しないようにタスクを使用して、画像のダウンロードや計算の実行などのバックグラウンド アクティビティについて説明しています。

私が取り組んでいるコードのほとんどは、複数のソースからの作業項目をキューに入れ、順番に処理する必要がある、複数のプロデューサー/単一のコンシューマーのフレーバーを持っています。1 つの例は、ファイルまたはデータベースへの最終的な書き込みのために、複数のスレッドからのログ行が 1 つのキューに順番に並べられるロギングです。単一のソースからのすべてのレコードは順序どおりに維持する必要があり、同じ時点からのレコードは、最終的な出力で互いに「近い」必要があります。

したがって、複数のスレッドまたはタスク、またはすべてがキューラーを呼び出しています。

lock( _queue ) // or use a lock-free queue!
{
   _queue.enqueue( some_work );
   _queueSemaphore.Release();
}

専用のワーカー スレッドがキューを処理します。

while( _queueSemaphore.WaitOne() )
{
   lock( _queue )
   {
      some_work = _queue.dequeue();     
   }
   deal_with( some_work );
}

これらのタスクのコンシューマー側専用のワーカー スレッドを用意することは、常に合理的であるように思われます。代わりに、TPL の構造を使用して将来のプログラムを作成する必要がありますか? どれ?なんで?

4

4 に答える 4

13

Wilka によって提案されているように、BlockingCollection からのアイテムを処理するために長時間実行される Task を使用できます。アプリケーションの要件をほぼ満たしている例を次に示します。次のような出力が表示されます。

Log from task B
Log from task A
Log from task B1
Log from task D
Log from task C

A、B、C、D からの出力は、スレッドの開始時間に依存するためランダムに表示されるわけではありませんが、B は常に B1 の前に表示されます。

public class LogItem 
{
    public string Message { get; private set; }

    public LogItem (string message)
    {
        Message = message;
    }
}

public void Example()
{
    BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();

    // Start queue listener...
    CancellationTokenSource canceller = new CancellationTokenSource();
    Task listener = Task.Factory.StartNew(() =>
        {
            while (!canceller.Token.IsCancellationRequested)
            {
                LogItem item;
                if (_queue.TryTake(out item))
                    Console.WriteLine(item.Message);
            }
        },
    canceller.Token, 
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);

    // Add some log messages in parallel...
    Parallel.Invoke(
        () => { _queue.Add(new LogItem("Log from task A")); },
        () => { 
            _queue.Add(new LogItem("Log from task B")); 
            _queue.Add(new LogItem("Log from task B1")); 
        },
        () => { _queue.Add(new LogItem("Log from task C")); },
        () => { _queue.Add(new LogItem("Log from task D")); });

    // Pretend to do other things...
    Thread.Sleep(1000);

    // Shut down the listener...
    canceller.Cancel();
    listener.Wait();
}
于 2010-05-06T07:20:02.350 に答える
5

この回答が約 1 年遅れていることは承知していますが、MSDNをご覧ください。

TaskScheduler クラスから LimitedConcurrencyLevelTask​​Scheduler を作成する方法を示します。同時実行を単一のタスクに制限することで、次の方法でキューに入れられた順にタスクを処理する必要があります。

LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(lcts);

factory.StartNew(()=> 
{
   // your code
});
于 2011-02-09T10:19:26.253 に答える
3

あなたのユースケースでTPLが適切かどうかはわかりません。私の理解では、TPL の主な使用例は、1 つの巨大なタスクを複数の小さなタスクに分割して並べて実行することです。たとえば、大きなリストがあり、各要素に同じ変換を適用したい場合です。この場合、リストのサブセットに変換を適用するいくつかのタスクを持つことができます。

あなたが説明するケースは、私にとってこの写真には当てはまらないようです. あなたの場合、同じことを並行して行ういくつかのタスクはありません。それぞれが独自のジョブ (プロデューサー) と消費する 1 つのタスクを実行するいくつかの異なるタスクがあります。複数のコンシューマーが必要な場合は、おそらく TPL をコンシューマー部分に使用できます。この場合、各コンシューマーが同じジョブを実行するためです (探している一時的な一貫性を強制するロジックが見つかったと仮定します)。

まあ、これはもちろん、この件に関する私の個人的な見解です

長く生きると繁栄

于 2010-02-23T12:46:24.497 に答える
2

BlockingCollectionが便利なようです。したがって、上記のコードでは、次のようなものを使用できます (_queueBlockingCollectionインスタンスであると仮定します):

// for your producers 
_queue.Add(some_work);

キューを処理する専用のワーカー スレッド:

foreach (var some_work in _queue.GetConsumingEnumerable())
{
    deal_with(some_work);
}

注: すべての生産者が生産を終了したら、電話をかける必要がありCompleteAdding()ます_queue

于 2010-03-21T18:44:12.823 に答える