7

作業を生成する 1 つのプロセスと、BlockingCollection<>その作業を消費する 2 番目のプロセスがあります。プログラムを閉じるときは、コンシューマーが作業を消費するのを止める必要がありますが、保留中だったが消費されていない作業をすばやくログに記録する必要があります。

foreach (<object> in BlockingCollection.GetConsumingEnumerable())現在、私の消費者はループを持つスレッドを生成しています。プログラムを停止すると、プロデューサーが を呼び出しますConsumer.BlockingCollection.CompleteAdding()。私が見つけたのは、私の消費者がキュー内のすべてを処理し続けていることです。

問題をグーグルで検索すると、CancellationToken. だから私はそれを試しました:

private void Process () { // This method runs in a separate thread
    try {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable(CancellationToken)) {
            // Consume
        }
    }
    catch (OperationCancelledException) {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable()) {
            // quickly log
        }
    }
}

私のプロデューサーは:

private CancellationTokenSource StopFlag = new CancellationTokenSource ();
MyConsumer.CancellationToken = StopFlag.Token;
// Make the consumer spawn it's consuming thread...
StopFlag.Cancel ();
MyConsumer.BlockingCollection.CompleteAdding ();

これを試してみると、OperationCancelledException が発生したという兆候はありません。

この質問は、キャンセル トークンの使用を説明しようとしていますが、正しく使用されていないようです。(引数:機能する場合、それは「十分に正しい」です。)そして、この質問は私の質問とまったく同じように見えますが、例はありません。(ここも同じ)

繰り返しになりますが、別の方法を使用してキャンセルされた後、キュー内の残りのアイテムを処理する必要があるという警告とともに、CancellationTokenonを適切に使用するにはどうすればよいですか?BlockingCollection.GetConsumingEnumerable()

(私の問題は、CancellationToken の適切な使用に集中していると思います。私のテストでは、プロセスが実際にキャンセルされていることは示されていません。(StopFlag.IsCancellationRequested常に等しいfalse))

4

2 に答える 2

5

に渡すと、CancellationTokenキャンセルGetConsumingEnumerableの例外がスローされず、アイテムの吐き出しが停止します。例外をキャッチするのではなく、トークンを確認します。

foreach (var item in BlockingCollection.
    GetConsumingEnumerable(CancellationToken))
{
    //consume item
}
if (CancellationToken.IsCancellationRequested)
    foreach (var item in BlockingCollection)
    {
        //log item
    }

また、キャンセルが要求CompletedAddingされ、 が呼び出されていない可能性がある場合は、 を呼び出すのではなく、コレクションを反復処理する必要があることに注意してくださいGetConsumingEnumerable。操作がキャンセルされたときにプロデューサーが追加を完了することがわかっている場合、それは問題ではありません。

于 2013-11-11T17:40:10.763 に答える
3

私の問題は、操作をキャンセルしようとしていた方法にありました。プロデューサが CancellationTokenSource を所有する代わりに、すべてをコンシューマに配置します。

public class cProducer {
    private cConsumer myConsumer = new cConsumer ();

    public void onStart () {
        myConsumer.OnStart ();
    }

    public void onStop () {
        myConsumer.OnStop ();
    }

    public void OnOrderReceived (cOrder newOrder) {
        myConsumer.orderQueue.Add (cOrder);
    }
}

public class cConsumer {
    private CancellationTokenSource stopFlag;
    public BlockingCollection<cOrder> orderQueue = new BlockingCollection<cOrder> ();
    private Task processingTask;

    public void OnStart () {
        stopFlag = new CancellationTokenSource ();
        processingTask = Task.Factory.StartNew (() => Process ());
    }

    public void OnStop () {
        stopFlag.Cancel ();
        orderQueue.CompleteAdding ();
        processingTask.Wait ();
    }

    private void Process () {
        try {
            foreach (cOrder newOrder in orderQueue.GetConsumingEnumerable (stopFlag.Token)) {
                // process
            }
        }
        catch (OperationCanceledException) {
            foreach (cOrder cancelledOrder in orderQueue.GetConsumingEnumerable ()) {
                // log it
            }
        }
    }
}
于 2013-11-11T18:14:00.250 に答える