13

私は最近、Reactive Framework を使っていくつかの作業を行っており、これまでのところ完全に気に入っています。サーバー操作をクリーンアップするために、従来のポーリング メッセージ キューをフィルター処理された IObservable に置き換えることを検討しています。古い方法では、サーバーに着信するメッセージを次のように処理しました。

// Start spinning the process message loop
   Task.Factory.StartNew(() =>
   {
       while (true)
       {
           Command command = m_CommandQueue.Take();
           ProcessMessage(command);
       }
   }, TaskCreationOptions.LongRunning);

その結果、クライアントからのコマンドを ProcessMessage メソッドに委任する継続的なポーリング スレッドが生成されます。このメソッドには、コマンドのタイプを決定し、そのタイプに基づいて作業を委任する一連の if/else-if ステートメントがあります。

これを、次のコードを記述した Reactive を使用するイベント駆動型システムに置き換えます。

 private BlockingCollection<BesiegedMessage> m_MessageQueue = new BlockingCollection<BesiegedMessage>();
 private IObservable<BesiegedMessage> m_MessagePublisher;

 m_MessagePublisher = m_MessageQueue
       .GetConsumingEnumerable()
       .ToObservable(TaskPoolScheduler.Default);

        // All generic Server messages (containing no properties) will be processed here
 IDisposable genericServerMessageSubscriber = m_MessagePublisher
       .Where(message => message is GenericServerMessage)
       .Subscribe(message =>
       {
           // do something with the generic server message here
       }

私の質問は、これは機能しますが、このような IObservable のバッキングとしてブロッキング コレクションを使用することは良い習慣ですか? Take() がこのように呼び出される場所がわかりません。メッセージが処理された後、メッセージが削除されずにキューに積み重なると思いますか?

これらのメッセージを取得するフィルタリングされた IObservables を駆動するバッキング コレクションとして Subjects を調べた方が効率的でしょうか? このシステムのアーキテクチャに役立つ可能性のある、ここで欠けているものは他にありますか?

4

4 に答える 4

8

これは、Visual Studio 2012 でテストされた完全に機能する例です。

  1. 新しい C# コンソール アプリを作成します。
  2. プロジェクトを右クリックし、[NuGet パッケージの管理] を選択して、[Reactive Extensions - Main Library] を追加します。

次の C# コードを追加します。

using System;
using System.Collections.Concurrent;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace DemoRX
{
    class Program
    {
        static void Main(string[] args)
        {
            BlockingCollection<string> myQueue = new BlockingCollection<string>();
            {                
                IObservable<string> ob = myQueue.
                  GetConsumingEnumerable().
                  ToObservable(TaskPoolScheduler.Default);

                ob.Subscribe(p =>
                {
                    // This handler will get called whenever 
                    // anything appears on myQueue in the future.
                    Console.Write("Consuming: {0}\n",p);                    
                });
            }
            // Now, adding items to myQueue will trigger the item to be consumed
            // in the predefined handler.
            myQueue.Add("a");
            myQueue.Add("b");
            myQueue.Add("c");           
            Console.Write("[any key to exit]\n");
            Console.ReadKey();
        }
    }
}

コンソールに次のように表示されます。

[any key to exit]
Consuming: a
Consuming: b
Consuming: c

RX を使用することの本当に優れた点は、LINQ の全機能を使用して不要なメッセージを除外できることです。たとえば、.Where「a」でフィルタリングする句を追加し、何が起こるかを観察します。

ob.Where(o => (o == "a")).Subscribe(p =>
{
    // This will get called whenever something appears on myQueue.
    Console.Write("Consuming: {0}\n",p);                    
});

哲学的ノート

専用スレッドを起動してキューをポーリングするよりもこの方法の利点は、プログラムの終了後にスレッドを適切に破棄することを心配する必要がないことです。これは、IDisposable や CancellationToken を気にする必要がないことを意味します (これは、BlockingCollection を処理するときに常に必要です。そうしないと、終了を拒否するスレッドでプログラムが終了時にハングする可能性があります)。

信じてください。BlockingCollection から発生するイベントを使用する完全に堅牢なコードを作成するのは、あなたが考えるほど簡単ではありません。上記のように、よりクリーンで堅牢で、コードが少なく、LINQ を使用してフィルター処理できる RX メソッドを使用することを好みます。

レイテンシー

この方法の速さに驚きました。

私の Xeon X5650 @ 2.67Ghz では、1000 万のイベントを処理するのに 5 秒かかります。これは、イベントあたり約 0.5 マイクロ秒になります。アイテムを BlockingCollection に入れるのに 4.5 秒かかったので、RX はそれらを取り出して、入ってくるのとほぼ同じ速さで処理していました。

ねじ切り

私のすべてのテストで、RX はキュー上のタスクを処理するために 1 つのスレッドのみをスピンアップしました。

これは、非常に優れたパターンがあることを意味します。RX を使用して複数のスレッドから着信データを収集し、それらを共有キューに配置してから、単一のスレッドでキューの内容を処理できます (定義上、スレッド セーフです)。

このパターンは、キューを介してデータのプロデューサーとコンシューマーを分離することにより、マルチスレッド コードを処理する際の膨大な問題を解消します。この場合、プロデューサーはマルチスレッドであり、コンシューマーはシングルスレッドであるため、スレッドセーフです。これが、Erlang を非常に堅牢にしているコンセプトです。このパターンの詳細については、Multi-threading madely simpleを参照してください。

于 2014-04-26T21:30:43.447 に答える
1

私はこの文脈では使用BlockingCollectionしていません - だから私は「推測」しています - あなたはそれを実行して承認、反証する必要があります.

BlockingCollectionここで事態をさらに複雑にするだけかもしれません (またはほとんど役に立たないかもしれません)。Jon からのこの投稿を見てください- 簡単に確認してください。GetConsumingEnumerable「加入者ごと」の列挙可能なものを提供します。最終的にそれらを使い果たす-Rxで念頭に置いておくべきこと.

また、IEnumerable<>.ToObservable「ソース」をさらに平坦化します。それが機能すると(ソースを検索できます-何よりもRxを使用することをお勧めします)-各サブスクライブは独自の「列挙子」を作成します-したがって、すべてが独自のバージョンのフィードを取得します. このような Observable シナリオでそれがどのように機能するのか、私にはよくわかりません。

とにかく-アプリ全体のメッセージを提供したい場合-IMOSubjectは、他の形式(公開など)で紹介または述べる必要があります。その意味では、BlockingCollection が役立つとは思いませんが、自分で試してみることをお勧めします。

注(哲学的なもの)

メッセージの種類を組み合わせたり、さまざまなソースを組み合わせたりする場合 (より「現実的な」シナリオなど) は、より複雑になります。そして、それは私が言わなければならない非常に興味深いものになります。

それらを単一共有ストリームに「根付かせる」ことに注意してください(そして、Jerが正しく提案したことは避けてください)。

を使用して回避しようとしないことをお勧めしますSubject。あなたが必要としているのは、それがあなたの友人です - すべての非状態関連の議論やサブジェクトがどれほど悪いものであっても - あなたは効果的に状態を持っています (そしてあなたには「状態」が必要です) - Rx は「事後」に作動するので、あなたは関係なく、それから利益を享受してください。

その結果が気に入っているので、そのようにすることをお勧めします。

于 2013-04-02T22:08:23.130 に答える
-1

ここでの問題は、キュー (特に BlockingCollection を使用している場合は、通常、1 つのコンシューマーによる破壊的な読み取りに関連付けられます) をブロードキャスト (現在リッスンしているすべての人に送信) に変えたことです。

これらは 2 つの相反する考えのようです。

私はこれが行われたのを見たことがありますが、「間違った質問に対する正しい解決策」であったため、破棄されました。

于 2013-04-09T11:48:29.213 に答える