124

私は現在、.NET 用の Reactive Extensions フレームワークに取り組み始めており、見つけたさまざまな紹介リソース (主にhttp://www.introtorx.com )を調べています。

私たちのアプリケーションには、ネットワーク フレームを検出する多くのハードウェア インターフェイスが含まれます。これらは私の IObservable になります。その後、これらのフレームを消費したり、データに対して何らかの変換を実行したり、新しいタイプのフレームを生成したりするさまざまなコンポーネントがあります。たとえば、n フレームごとに表示する必要がある他のコンポーネントもあります。Rx は私たちのアプリケーションに役立つと確信していますが、IObserver インターフェイスの実装の詳細に苦労しています。

私が読んだリソースのほとんど (すべてではないにしても) は、IObservable インターフェイスを自分で実装するのではなく、提供されている関数またはクラスのいずれかを使用するべきだと述べています。私の調査によると、 を作成するSubject<IBaseFrame>と必要なものが得られるようです。ハードウェア インターフェイスからデータを読み取り、Subject<IBaseFrame>インスタンスの OnNext 関数を呼び出す単一のスレッドが作成されます。その後、さまざまな IObserver コンポーネントがそのサブジェクトから通知を受け取ります。

私の混乱は、このチュートリアルの付録にある次のアドバイスから来ています。

サブジェクト タイプの使用は避けてください。Rx は事実上、関数型プログラミングのパラダイムです。サブジェクトを使用するということは、潜在的に変化する状態を管理していることを意味します。変更状態と非同期プログラミングの両方を同時に処理することは、非常に困難です。さらに、多くの演算子 (拡張メソッド) は、サブスクリプションとシーケンスの正確で一貫した有効期間が維持されるように注意深く記述されています。主題を導入するとき、これを破ることができます。サブジェクトを明示的に使用すると、将来のリリースでもパフォーマンスが大幅に低下する可能性があります。

私のアプリケーションは非常にパフォーマンスが重要です。Rx パターンを使用してパフォーマンスをテストしてから、実稼働コードに入れる予定です。ただし、Subject クラスを使用して Rx フレームワークの精神に反することを行っていること、およびフレームワークの将来のバージョンでパフォーマンスが低下することを懸念しています。

私がやりたいことをするより良い方法はありますか?ハードウェア ポーリング スレッドは、オブザーバーがあるかどうかに関係なく継続的に実行されるため (それ以外の場合は HW バッファーがバックアップされます)、これは非常にホットなシーケンスです。次に、受信したフレームを複数のオブザーバーに渡す必要があります。

アドバイスをいただければ幸いです。

4

5 に答える 5

86

わかりました、私の独断的な方法を無視し、「主題が良い/悪い」をすべて一緒に無視するとします。問題空間を見てみましょう。

2 つのスタイルのシステムのうちの 1 つを持っているに違いありません。

  1. メッセージが到着すると、システムはイベントまたはコールバックを発生させます
  2. システムをポーリングして、処理するメッセージがあるかどうかを確認する必要があります

オプション 1 の場合は簡単です。適切な FromEvent メソッドでラップするだけで完了です。パブへ!

オプション 2 では、これをポーリングする方法と、これを効率的に行う方法を検討する必要があります。また、値を取得したら、どのように公開しますか?

ポーリング専用のスレッドが必要になると思います。他のコーダーが ThreadPool/TaskPool を叩き、ThreadPool 飢餓状態に陥らせたくないでしょう。あるいは、コンテキスト切り替えの面倒を望まないでしょう (私は推測します)。したがって、独自のスレッドがあると仮定すると、ポーリングするために座っているある種の While/Sleep ループがおそらくあるでしょう。チェックでいくつかのメッセージが見つかると、それらを公開します。これらはすべて、Observable.Create にぴったりのように思えます。キャンセルを許可するために Disposable を返すことができないため、While ループを使用することはおそらくできません。幸いなことに、あなたは本全体を読んでいるので、再帰的スケジューリングに精通しています!

このようなものがうまくいくと思います。#未検証

public class MessageListener
{
    private readonly IObservable<IMessage> _messages;
    private readonly IScheduler _scheduler;

    public MessageListener()
    {
        _scheduler = new EventLoopScheduler();

        var messages = ListenToMessages()
                                    .SubscribeOn(_scheduler)
                                    .Publish();

        _messages = messages;
        messages.Connect();
    }

    public IObservable<IMessage> Messages
    {
        get {return _messages;}
    }

    private IObservable<IMessage> ListenToMessages()
    {
        return Observable.Create<IMessage>(o=>
        {
                return _scheduler.Schedule(recurse=>
                {
                    try
                    {           
                        var messages = GetMessages();
                        foreach (var msg in messages)
                        {
                            o.OnNext(msg);
                        }   
                        recurse();
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }                   
                });
        });
    }

    private IEnumerable<IMessage> GetMessages()
    {
         //Do some work here that gets messages from a queue, 
         // file system, database or other system that cant push 
         // new data at us.
         // 
         //This may return an empty result when no new data is found.
    }
}

私がサブジェクトが本当に好きではない理由は、通常、開発者が問題について明確な設計をしていない場合です。主題をハックし、あちこちでそれを突き刺し、貧弱なサポート開発者にWTFが起こっていると推測させます. Create/Generate などのメソッドを使用すると、シーケンスへの影響がローカライズされます。すべてを 1 つのメソッドで確認でき、他の誰も厄介な副作用を引き起こしていないことがわかります。サブジェクト フィールドが表示された場合は、そのフィールドが使用されているクラス内のすべての場所を探す必要があります。一部のMFerが1つを公開した場合、このシーケンスがどのように使用されているかを知っている人は、すべての賭けがオフになります! 非同期/同時実行/Rx は難しいです。副作用や因果関係のプログラミングに頭を悩ませて、難しくする必要はありません。

于 2013-01-22T14:16:40.513 に答える
43

一般に、 の使用は避けるべきSubjectですが、ここで行っていることについては、非常にうまく機能すると思います。Rx チュートリアルで「対象を避ける」というメッセージに出くわしたときに、同様の質問をしました。

(Rxxの)デイブ・セクストンを引用するには

「サブジェクトは Rx のステートフル コンポーネントです。フィールドまたはローカル変数としてイベントのようなオブザーバブルを作成する必要がある場合に役立ちます。」

私はそれらを Rx へのエントリ ポイントとして使用する傾向があります。したがって、「何かが起こった」(あなたが持っているように) と言う必要があるコードがある場合は、 a を使用してSubjectを呼び出しますOnNext。次に、それIObservableを他の人がサブスクライブできるようAsObservable()に公開します (サブジェクトで使用して、誰もサブジェクトにキャストして物事を台無しにできないようにすることができます)。

.NET イベントを使用して を使用することもできますがFromEventPattern、とにかくイベントを に変えるだけならIObservable、イベントの代わりにイベントを使用するメリットがわかりませんSubject(つまり、ここに何か)

IObservableただし、絶対に避けるべきことは、 aを使用して an をサブスクライブすることです。つまり、メソッドに aをSubject渡さないでください。SubjectIObservable.Subscribe

于 2013-01-18T19:56:31.403 に答える
8

引用されたブロック テキストは、なぜ を使用してはいけないのかを説明していますSubject<T>が、簡単に言うと、observer と observable の機能を組み合わせて、その間にある種の状態を挿入しています (カプセル化するか拡張するかに関係なく)。

ここで問題が発生します。これらの責任は分離され、互いに区別されるべきです。

とはいえ、あなたの特定のケースでは、懸念をより小さな部分に分割することをお勧めします。

まず、ホットなスレッドがあり、通知を発生させるシグナルについてハードウェアを常に監視しています。これを通常はどのように行いますか? イベント。それでは、それから始めましょう。

EventArgsイベントが発生することを定義しましょう。

// The event args that has the information.
public class BaseFrameEventArgs : EventArgs
{
    public BaseFrameEventArgs(IBaseFrame baseFrame)
    {
        // Validate parameters.
        if (baseFrame == null) throw new ArgumentNullException("IBaseFrame");

        // Set values.
        BaseFrame = baseFrame;
    }

    // Poor man's immutability.
    public IBaseFrame BaseFrame { get; private set; }
}

次に、イベントを発生させるクラスです。これは、静的クラス (ハードウェア バッファーを監視するスレッドが常に実行されているため)、またはそれにサブスクライブするオンデマンドと呼ばれるものである可能性があることに注意してください。これを適宜変更する必要があります。

public class BaseFrameMonitor
{
    // You want to make this access thread safe
    public event EventHandler<BaseFrameEventArgs> HardwareEvent;

    public BaseFrameMonitor()
    {
        // Create/subscribe to your thread that
        // drains hardware signals.
    }
}

これで、イベントを公開するクラスができました。オブザーバブルはイベントとうまく連携します。class のstaticメソッドIObservable<T>介して、標準のイベント パターンに従う場合、イベントのストリーム (イベント ストリームをイベントの複数の発生と考えてください) を実装に変換するためのファーストクラスのサポートがあるほどですFromEventPatternObservable

イベントのソースとFromEventPatternメソッドを使用して、次のIObservable<EventPattern<BaseFrameEventArgs>>ように簡単に作成できます (EventPattern<TEventArgs>クラスは、.NET イベントで表示されるもの、特に派生したインスタンスEventArgsと送信者を表すオブジェクトを具体化します)。

// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();

// Create the observable.  It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
    FromEventPattern<BaseFrameEventArgs>(
        h => source.HardwareEvent += h,
        h => source.HardwareEvent -= h);

もちろん、 が必要ですが、クラスの拡張メソッドIObservable<IBaseFrame>を使用してプロジェクションを作成するのは簡単です (LINQ で行うのと同じように、これらすべてを使いやすいメソッドにまとめることができます)。SelectObservable

public IObservable<IBaseFrame> CreateHardwareObservable()
{
    // The event source.
    // Or you might not need this if your class is static and exposes
    // the event as a static event.
    var source = new BaseFrameMonitor();

    // Create the observable.  It's going to be hot
    // as the events are hot.
    IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
        FromEventPattern<BaseFrameEventArgs>(
            h => source.HardwareEvent += h,
            h => source.HardwareEvent -= h);

    // Return the observable, but projected.
    return observable.Select(i => i.EventArgs.BaseFrame);
}
于 2013-01-18T18:02:26.463 に答える