24

C# 4.0 の悲しい時代に、IEnumerable の "yield return" 継続をハッキングしてオブザーバブルを待機することで、GUI スレッドで非同期ワークフローを許可する次の "WorkflowExecutor" クラスを作成しました。したがって、次のコードは、button1Click で、テキストを更新する単純なワークフローを開始し、button2 がクリックされるのを待って、1 秒後にループします。

public sealed partial class Form1 : Form {
    readonly Subject<Unit> _button2Subject = new Subject<Unit>();
    readonly WorkflowExecutor _workflowExecutor = new WorkflowExecutor();

    public Form1() {
        InitializeComponent();
    }

    IEnumerable<IObservable<Unit>> CreateAsyncHandler() {
        Text = "Initializing";
        var scheduler = new ControlScheduler(this);
        while (true) {
            yield return scheduler.WaitTimer(1000);
            Text = "Waiting for Click";
            yield return _button2Subject;
            Text = "Click Detected!";
            yield return scheduler.WaitTimer(1000);
            Text = "Restarting";
        }
    }

    void button1_Click(object sender, EventArgs e) {
        _workflowExecutor.Run(CreateAsyncHandler());
    }

    void button2_Click(object sender, EventArgs e) {
        _button2Subject.OnNext(Unit.Default);
    }

    void button3_Click(object sender, EventArgs e) {
        _workflowExecutor.Stop();
    }
}

public static class TimerHelper {
    public static IObservable<Unit> WaitTimer(this IScheduler scheduler, double ms) {
        return Observable.Timer(TimeSpan.FromMilliseconds(ms), scheduler).Select(_ => Unit.Default);
    }
}

public sealed class WorkflowExecutor {
    IEnumerator<IObservable<Unit>> _observables;
    IDisposable _subscription;

    public void Run(IEnumerable<IObservable<Unit>> actions) {
        _observables = (actions ?? new IObservable<Unit>[0]).GetEnumerator();
        Continue();
    }

    void Continue() {
        if (_subscription != null) {
            _subscription.Dispose();
        }
        if (_observables.MoveNext()) {
            _subscription = _observables.Current.Subscribe(_ => Continue());
        }
    }

    public void Stop() {
        Run(null);
    }
}

「yield」継続を使用して非同期作業を行うというアイデアのスマートな部分は、Daniel Earwickerの AsyncIOPipe アイデアから引用されました。return-of-lambdas/、その上にリアクティブ フレームワークを追加しました。

現在、C# 5.0 の async 機能を使用してこれを書き直すのに問題がありますが、それは簡単なことのようです。オブザーバブルをタスクに変換すると、一度だけ実行され、while ループが 2 回目にクラッシュします。それを修正する助けは素晴らしいでしょう。

言った/尋ねたすべてのことですが、async/await メカニズムによって、WorkflowExecutor では得られないものは何ですか? WorkflowExecutor では (同様の量のコードが与えられた場合) できないことで、async/await でできることはありますか?

4

2 に答える 2

37

Jamesが述べたように、Rxv2.0Betaで始まるIObservable<T>シーケンスを待つことができます。動作は、最後の要素(OnCompletedの前)を返すか、観察されたOnErrorをスローすることです。シーケンスに要素が含まれていない場合、InvalidOperationExceptionが発生します。

これを使用していることに注意してください。他のすべての望ましい動作を取得できます。

  • xs.FirstAsync()を待って最初の要素を取得します
  • xs.SingleAsync()を待って、値が1つだけであることを確認します
  • 空のシーケンスで問題がない場合は、xs.DefaultIfEmpty()を待ちます
  • すべての要素を取得するには、xs.ToArray()を待つかxs.ToList()を待ちます

集計の結果を計算するなど、さらに凝ったことを行うことができますが、DoとScanを使用して中間値を観察できます。

var xs = Observable.Range(0, 10, Scheduler.Default);

var res = xs.Scan((x, y) => x + y)
            .Do(x => { Console.WriteLine("Busy. Current sum is {0}", x); });

Console.WriteLine("Done! The sum is {0}", await res);
于 2012-05-24T17:08:21.807 に答える
31

お気づきのように、Observable の「イベントのストリーム」とは対照的に、Task は 1 回限りの使用です。これ (IMHO) を考える良い方法は、2.0 Beta に関する Rx チームの投稿の 2x2 チャートです。

タスク vs オブザーバブルの 2x2 チャート

状況 (イベントの 1 回限りまたは「ストリーム」) によっては、Observable を保持する方が理にかなっている場合があります。

Reactive 2.0 Beta に移行できる場合は、それを使用してオブザーバブルを「待機」できます。たとえば、コードの「非同期/待機」(おおよその) バージョンでの私自身の試みは次のようになります。

public sealed partial class Form1 : Form
{
    readonly Subject<Unit> _button2Subject = new Subject<Unit>();

    private bool shouldRun = false;

    public Form1()
    {
        InitializeComponent();
    }

    async Task CreateAsyncHandler()
    {
        Text = "Initializing";
        while (shouldRun)
        {
            await Task.Delay(1000);
            Text = "Waiting for Click";
            await _button2Subject.FirstAsync();
            Text = "Click Detected!";
            await Task.Delay(1000);
            Text = "Restarting";
        }
    }

    async void button1_Click(object sender, EventArgs e)
    {
        shouldRun = true;
        await CreateAsyncHandler();
    }

    void button2_Click(object sender, EventArgs e)
    {
        _button2Subject.OnNext(Unit.Default);
    }

    void button3_Click(object sender, EventArgs e)
    {
        shouldRun = false;
    }
}
于 2012-04-24T02:19:32.907 に答える