7

編集:この質問の要件が変更されました。以下の更新セクションを参照してください。

IAsyncEnumerable<int>200ミリ秒ごとに1つの数値(数値のストリーム)を生成する非同期反復子メソッドがあります。このメソッドの呼び出し元はストリームを消費しますが、1000 ミリ秒後に列挙を停止したいと考えています。したがって、 aが使用され、トークンが引数として拡張メソッドCancellationTokenSourceに渡されます。WithCancellationしかし、トークンは尊重されません。列挙は、すべての数値が消費されるまで続きます。

static async IAsyncEnumerable<int> GetSequence()
{
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(200);
        yield return i;
    }
}

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

出力:

12:55:17.506 > 1
12:55:17.739 > 2
12:55:17.941 > 3
12:55:18.155 > 4
12:55:18.367 > 5
12:55:18.570 > 6
12:55:18.772 > 7
12 :55:18.973 > 8
12:55:19.174 > 9
12:55:19.376 > 10

期待される出力は、TaskCanceledException5 番目の後に発生するWithCancellationです。 が実際に何をしているのかを誤解しているようです。メソッドは、提供されたトークンをイテレータ メソッドに渡すだけです (イテレータ メソッドがトークンを受け入れる場合)。それ以外の場合、私の例のメソッドと同様にGetSequence()、トークンは無視されます。私の場合の解決策は、列挙体の本体内のトークンを手動で調べることだと思います。

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())
{
    cts.Token.ThrowIfCancellationRequested();
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

これは簡単で、うまく機能します。しかし、いずれにせよ、私が期待していたことを実行する拡張メソッドを作成してWithCancellation、後続の列挙内にトークンを焼き付けることができるかどうか疑問に思います。これは、必要なメソッドの署名です。

public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
    // Is it possible?
}

更新: この質問をしたとき、キャンセルの概念全体の目的について誤った理解をしていたようです。キャンセルは の待機の後にループを壊すことを目的としているのMoveNextAsyncに対し、実際の目的は待機自体をキャンセルすることであるという印象を受けました。私の些細な例では、待機は 200 ミリ秒しか持続しませんが、実際の例では、待機はさらに長くなり、無限になることさえあります。これに気付いた後、現在の形式の私の質問にはほとんど価値がなく、削除して同じタイトルの新しい質問を開くか、既存の質問の要件を変更する必要があります。どちらのオプションも、ある意味で悪いです。

私は2番目のオプションを選択することにしました。したがって、私は現在受け入れられている回答を受け入れません。すぐに効果のある方法でキャンセルを強制するという、より困難な問題に対する新しい解決策を求めています。つまり、トークンをキャンセルすると、数ミリ秒で非同期列挙が完了するはずです。望ましい動作と望ましくない動作を区別するための実用的な例を挙げましょう。

var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
{
    await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
    {
        Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
}

出力 (望ましい):

0:00.242 > 1
0:00.467 > 2
0:00.500 > キャンセル

出力 (望ましくない):

0:00.242 > 1
0:00.467 > 2
0:00.707 > キャンセル

GetSequence最初の例と同じ方法で、200 ミリ秒ごとに 1 つの数値をストリーミングします。このメソッドはキャンセルをサポートしていません。前提として、それを変更することはできません。WithEnforcedCancellationこの問題を解決するために必要な拡張メソッドです。

4

3 に答える 3

2

これを行うべきではないことを繰り返し強調することが重要だと思います。非同期メソッドがキャンセル トークンをサポートするようにすることを常にお勧めします。そうすれば、期待どおりすぐにキャンセルできます。それが不可能な場合でも、この回答を試す前に他の回答を試すことをお勧めします。

そうは言っても、 async メソッドにキャンセルのサポートを追加できず、foreachをすぐに終了する必要がある場合は、それを回避することができます。

1 つのトリックはTask.WhenAny、2 つの引数を使用することです。

  1. あなたが得る仕事IAsyncEnumerator.MoveNextAsync()
  2. キャンセルをサポートする別のタスク

ショートバージョンはこちら

// Start the 'await foreach' without the new syntax
// because we need access to the ValueTask returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);

// Combine MoveNextAsync() with another Task that can be awaited indefinitely,
// until it throws OperationCanceledException
var untilCanceled = UntilCanceled(cancellationToken);
while (await await Task.WhenAny(enumerator.MoveNextAsync().AsTask(), untilCanceled))
{
    yield return enumerator.Current;
}

完全を期すための長いバージョンはConfigureAwait(false)DisposeAsync()ローカルで実行すると機能するはずです。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class AsyncStreamHelper
{
    public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(this IAsyncEnumerable<T> source, [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        if (source == null)
            throw new ArgumentNullException(nameof(source));
        cancellationToken.ThrowIfCancellationRequested();

        // Start the 'await foreach' without the new syntax
        // because we need access to the ValueTask returned by MoveNextAsync()
        var enumerator = source.GetAsyncEnumerator(cancellationToken);
        Task<bool> moveNext = null;

        // Combine MoveNextAsync() with another Task that can be awaited indefinitely,
        // until it throws OperationCanceledException
        var untilCanceled = UntilCanceled(cancellationToken);
        try
        {
            while (
                await (
                    await Task.WhenAny(
                        (
                            moveNext = enumerator.MoveNextAsync().AsTask()
                        ),
                        untilCanceled
                    ).ConfigureAwait(false)
                )
            )
            {
                yield return enumerator.Current;
            }
        }
        finally
        {
            if (moveNext != null && !moveNext.IsCompleted)
            {
                // Disable warning CS4014 "Because this call is not awaited, execution of the current method continues before the call is completed"
#pragma warning disable 4014 // This is the behavior we want!

                moveNext.ContinueWith(async _ =>
                {
                    await enumerator.DisposeAsync();
                }, TaskScheduler.Default);
#pragma warning restore 4014
            }
            else if (enumerator != null)
            {
                await enumerator.DisposeAsync();
            }
        }
    }

    private static Task<bool> UntilCanceled(CancellationToken cancellationToken)
    {
        // This is just one possible implementation... feel free to swap out for something else
        return new Task<bool>(() => true, cancellationToken);
    }
}

public class Program
{
    public static async Task Main()
    {
        var cts = new CancellationTokenSource(500);
        var stopwatch = Stopwatch.StartNew();
        try
        {
            await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
            {
                Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
        }
    }

    static async IAsyncEnumerable<int> GetSequence()
    {
        for (int i = 1; i <= 10; i++)
        {
            await Task.Delay(200);
            yield return i;
        }
    }
}

注意事項

列挙子は、パフォーマンスを向上させるために ValueTask を返します (通常のタスクよりも少ない割り当てを使用します) が、ValueTask は と共に使用できないため、割り当てオーバーヘッドを導入することによってパフォーマンスを低下させるTask.WhenAny()ために使用されます。AsTask()

列挙子は、最新のMoveNextAsync()ものが完了した場合にのみ破棄できます。キャンセルが要求されたときにタスクがまだ実行されている可能性が高くなります。DisposeAsyncそのため、継続タスクに別の呼び出しを追加しました。

WithEnforcedCancellation()このシナリオでは、メソッドの終了時に列挙子はまだ破棄されていません。列挙が放棄された後、不確定な時間で破棄されます。がDisposeAsync()例外をスローすると、例外は失われます。コール スタックがないため、コール スタックをバブルアップできません。

于 2021-09-02T00:49:38.140 に答える