10

私は最近IObservableについて読んでいます。これまでのところ、さまざまな SO の質問を見て、何ができるかについてのビデオを見てきました。私が考えている「プッシュ」メカニズム全体は素晴らしいですが、すべてが正確に何をするのかをまだ理解しようとしています. 私の読書から、私はある意味IObservableで「見る」ことができるものでありIObservers、「ウォッチャー」であると推測します.

だから今、私は自分のアプリケーションでこれを実装しようとしています。始める前に、私が気をつけたいことがいくつかあります。IObservable は IEnumerable の反対であることがわかりましたが、特定のインスタンスでアプリに組み込むことができる場所を実際に確認することはできません。

現在、私はイベントを多用しているため、「配管」が管理不能になり始めていることがわかります。IObservable がここで私を助けることができると思います。

次の設計を検討してください。これは、アプリケーション内の I/O のラッパーです (参考までに、通常は文字列を処理する必要があります)。

私はと呼ばれる基本インターフェースを持っていますIDataIO:

public interface IDataIO
{
  event OnDataReceived;
  event OnTimeout:
  event OnTransmit;
}

現在、このインターフェイスを実装する 3 つのクラスがあります。これらの各クラスは、何らかの方法で Async メソッド呼び出しを利用し、ある種のマルチスレッド処理を導入しています。

public class SerialIO : IDataIO;
public class UdpIO : IDataIO;
public class TcpIO : IDataIO;

これらの各クラスの IO と呼ばれる最終的なクラスにラップされた単一のインスタンスがあります (これは IDataIO も実装しています - 私の戦略パターンに準拠しています)。

public class IO : IDataIO
{
  public SerialIO Serial;
  public UdpIO Udp;
  public TcpIO Tcp;
}

ストラテジー パターンを利用してこれら 3 つのクラスをカプセル化したので、実行時に異なるIDataIOインスタンス間で変更しても、エンド ユーザーには「見えない」ようになりました。ご想像のとおり、これにより、バックグラウンドでかなりの「イベント配管」が行われました。

では、私の場合、ここで「プッシュ」通知をどのように利用できますか? イベント (DataReceived など) をサブスクライブする代わりに、関心のある人にデータをプッシュするだけです。どこから始めればよいか少しわかりません。私はまだ のアイデア/ジェネリック クラスSubject、およびこれのさまざまな化身 (ReplaySubject/AsynSubject/BehaviourSubject) をいじろうとしています。誰かがこれについて私に教えてくれませんか(おそらく私のデザインを参照して)? それとも、これは単に理想的ではありませんIObservableか?

PS。私の「誤解」を自由に修正してください:)

4

3 に答える 3

9

Observable はデータ ストリームを表すのに最適であるため、イベントは、またはDataReceivedのような Observable パターンにうまくモデル化されます。また、便利なとの追加の利点も得られます。IObservable<byte>IObservable<byte[]>OnErrorOnComplete

それを実装するという点では、正確なシナリオについて言うのは難しいですがSubject<T>、基本的なソースとして使用し、OnNextデータをプッシュするように呼び出すことがよくあります。多分何かのような

// Using a subject is probably the easiest way to push data to an Observable
// It wraps up both IObservable and IObserver so you almost never use IObserver directly
private readonly Subject<byte> subject = new Subject<byte>();

private void OnPort_DataReceived(object sender, EventArgs e)
{
    // This pushes the data to the IObserver, which is probably just a wrapper
    // around your subscribe delegate is you're using the Rx extensions
    this.subject.OnNext(port.Data); // pseudo code 
}

次に、プロパティを介してサブジェクトを公開できます。

public IObservable<byte> DataObservable
{
    get { return this.subject; } // Or this.subject.AsObservable();
}

DataReceivedイベントを にIDataIO置き換えて、IObservable<T>各戦略クラスに必要な方法でデータを処理させ、 にプッシュすることができSubject<T>ます。

反対に、Observable をサブスクライブする人は誰でも、( を使用するだけで) イベントのようにそれを処理するか、、、 などAction<byte[]>を使用してストリーム上で非常に便利な作業を実行できます。SelectWhereBuffer

private IDataIO dataIo = new ...

private void SubscribeToData()
{ 
    dataIo.DataObservable.Buffer(16).Subscribe(On16Bytes);
}

private void On16Bytes(IList<byte> bytes)
{
    // do stuff
}

ReplaySubject/ConnectableObservableは、サブスクライバーがパーティーに遅れて到着することがわかっているが、すべてのイベントに追いつく必要がある場合に最適です。ソースは、プッシュされたすべてをキャッシュし、各サブスクライバーのすべてを再生します。それが実際に必要な動作であるかどうかを判断できるのはあなただけです (ただし、メモリ使用量が明らかに増加するすべてのものをキャッシュするので注意してください)。

Rx について学んでいたとき、Rxに関するhttp://leecampbell.blogspot.co.uk/ブログ シリーズが理論を理解するのに非常に有益であることがわかりました (投稿は現在少し古く、API が変更されているので注意してください)。 )

于 2012-06-25T13:48:03.467 に答える
5

これは間違いなくオブザーバブルの理想的なケースです。IOクラスはおそらく最も改善されます。まず、オブザーバブルを使用するようにインターフェースを変更し、結合クラスがどれほど単純になるかを見てみましょう。

public interface IDataIO
{
    //you will have to fill in the types here.  Either the event args
    //the events provide now or byte[] or something relevant would be good.
    IObservable<???> DataReceived;
    IObservable<???> Timeout;
    IObservable<???> Transmit;
}

public class IO : IDataIO
{
    public SerialIO Serial;
    public UdpIO Udp;
    public TcpIO Tcp;

    public IObservable<???> DataReceived
    {
        get 
        {
            return Observable.Merge(Serial.DataReceived,
                                    Udp.DataReceived,
                                    Tcp.DataReceived);
        }
    }

    //similarly for other two observables
}

補足:インターフェイス メンバー名を変更したことに気付くかもしれません。.NET では、通常、イベントに名前が付けられ、イベントを発生さ<event name>せる関数が呼び出されOn<event name>ます。

プロデュース クラスについては、実際のソースに応じていくつかのオプションがあります。で .NET SerialPort クラスを使用していてSerialIO、 をDataReceived返すとしますIObservable<byte[]>。SerialPort には受信したデータのイベントが既にあるため、それを直接使用して、必要なオブザーバブルを作成できます。

public class SerialIO : IDataIO
{
    private SerialPort _port;

    public IObservable<byte[]> DataRecived
    {
        get
        {
            return Observable.FromEventPattern<SerialDataReceivedEventHandler,
                                               SerialDataReceivedEventArgs>(
                        h => _port.DataReceived += h,
                        h => _port.DataReceived -= h)
                   .Where(ep => ep.EventArgs.EventType == SerialData.Chars)
                   .Select(ep =>
                           {
                              byte[] buffer = new byte[_port.BytesToRead];
                              _port.Read(buffer, 0, buffer.Length);
                              return buffer;
                           });
        }
    }
}

既存のイベント ソースがない場合は、RichK が提案したようにサブジェクトを使用する必要がある場合があります。彼の答えはその使用パターンを非常によくカバーしているので、ここでは複製しません。

このインターフェイスの使用方法は示されていませんが、ユース ケースによっては、これらのクラスの他の関数がIObservables 自体を返し、これらの「イベント」を完全に排除する方が理にかなっている場合があります。イベントベースの非同期パターンでは、作業をトリガーするために呼び出す関数とは別のイベントを用意する必要がありますが、オブザーバブルを使用すると、代わりに関数からイベントを返して、サブスクライブしているものをより明確にすることができます。このアプローチでは、各呼び出しから返されたオブザーバブルを送信OnErrorし、OnCompletedメッセージで操作の終了を通知することもできます。結合クラスの使用に基づいて、これがこの特定のケースで役立つとは思いませんが、覚えておく必要があります。

于 2012-06-26T04:03:44.613 に答える
0

イベントの代わりに IObservable を使用する

プロパティの変更のみに関心がある場合、nuget パッケージrxxには次のものがあります。

IObservable<string> obs=Observable2.FromPropertyChangedPattern(() => obj.Name)

(他の多くの方法とともに)


または、イベントがプロパティの変更を妨げている場合 / INotifyPropertyChanged の実装を回避しようとしている場合

class ObserveEvent_Simple
{
    public static event EventHandler SimpleEvent;
    static void Main()
    {          
       IObservable<string> eventAsObservable = Observable.FromEventPattern(
            ev => SimpleEvent += ev,
            ev => SimpleEvent -= ev);
    }
}

http://rxwiki.wikidot.com/101samples#toc6の u/Gideon Engelberth に似ています

https://rehansaeed.com/reactive-extensions-part2-wrapping-events/でカバー


また、このコードプロジェクトの記事は、イベントからリアクティブ イベントへの変換に専念しています。

https://www.codeproject.com/Tips/1078183/Weak-events-in-NET-using-Reactive-Extensions-Rx

また、弱いサブスクリプションにも対処します

于 2017-10-14T19:34:55.167 に答える