8

次のコードでは、CancellationToken を使用して、プロデューサーが生成していないときに GetConsumingEnumerable() をウェイクアップし、foreach から抜け出してタスクを終了したいと考えています。しかし、 IsCancellationRequested がログに記録されているのが見えず、 Task.Wait(timeOut) が timeOut 期間全体を待機します。私は何を間違っていますか?

userToken.Task = Task.Factory.StartNew(state =>
{
    userToken.CancelToken = new CancellationTokenSource();

    foreach (var broadcast in userToken.BroadcastQueue.GetConsumingEnumerable(userToken.CancelToken.Token))
    {
        if (userToken.CancelToken.IsCancellationRequested)
        {
            Log.Write("BroadcastQueue IsCancellationRequested");
            break;
            ...
        }
    }

    return 0;
}, "TaskSubscribe", TaskCreationOptions.LongRunning);

後で...

UserToken.CancelToken.Cancel();          
try
{
    task.Wait(timeOut);
}
catch (AggregateException ar)
{
    Log.Write("AggregateException " + ar.InnerException, MsgType.InfoMsg);
}
catch (OperationCanceledException)
{
    Log.Write("BroadcastQueue Cancelled", MsgType.InfoMsg);
}
4

2 に答える 2

11

CompleteAdding()これ以上アイテムがコレクションに追加されないことを示す which を使用できます。が使用されている場合GetConsumingEnumerable、 foreach はそれ以上のアイテムを待つ意味がないことを認識しているため、正常に終了します。

基本的に、アイテムの追加が完了したら、次のことをBlockingCollection行います。

myBlockingCollection.CompleteAdding() 

GetConsumingEnumerable で foreach ループを実行しているスレッドは、ループを停止します。

于 2012-09-27T11:12:23.827 に答える
5

簡単なプロトタイプを作成しましたが、うまくいくようです。

トークンキャンセル要求の直前の Thread.Sleep(1000) に注意してください。異なるスレッドで変数を作成してアクセスするため、トークン変数の競合状態を作成している可能性があります。item.CancelToken

たとえば、タスクをキャンセルすることを意図したコードは、間違った (前または null) キャンセル トークンでキャンセルを呼び出す可能性があります。

static void Main(string[] args)
{
    CancellationTokenSource token = null;
    BlockingCollection<string> coll = new BlockingCollection<string>();
    var t = Task.Factory.StartNew(state =>
    {
        token = new CancellationTokenSource();
        try
        {
            foreach (var broadcast in coll.GetConsumingEnumerable(token.Token))
            {
                if (token.IsCancellationRequested)
                {
                    return;
                }
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Cancel");
            return;
        }
    }, "TaskSubscribe", TaskCreationOptions.LongRunning);
    Thread.Sleep(1000);
    token.Cancel();
    t.Wait();
}
于 2012-02-04T03:39:56.987 に答える