私は最近、Reactive Framework を使っていくつかの作業を行っており、これまでのところ完全に気に入っています。サーバー操作をクリーンアップするために、従来のポーリング メッセージ キューをフィルター処理された IObservable に置き換えることを検討しています。古い方法では、サーバーに着信するメッセージを次のように処理しました。
// Start spinning the process message loop
Task.Factory.StartNew(() =>
{
while (true)
{
Command command = m_CommandQueue.Take();
ProcessMessage(command);
}
}, TaskCreationOptions.LongRunning);
その結果、クライアントからのコマンドを ProcessMessage メソッドに委任する継続的なポーリング スレッドが生成されます。このメソッドには、コマンドのタイプを決定し、そのタイプに基づいて作業を委任する一連の if/else-if ステートメントがあります。
これを、次のコードを記述した Reactive を使用するイベント駆動型システムに置き換えます。
private BlockingCollection<BesiegedMessage> m_MessageQueue = new BlockingCollection<BesiegedMessage>();
private IObservable<BesiegedMessage> m_MessagePublisher;
m_MessagePublisher = m_MessageQueue
.GetConsumingEnumerable()
.ToObservable(TaskPoolScheduler.Default);
// All generic Server messages (containing no properties) will be processed here
IDisposable genericServerMessageSubscriber = m_MessagePublisher
.Where(message => message is GenericServerMessage)
.Subscribe(message =>
{
// do something with the generic server message here
}
私の質問は、これは機能しますが、このような IObservable のバッキングとしてブロッキング コレクションを使用することは良い習慣ですか? Take() がこのように呼び出される場所がわかりません。メッセージが処理された後、メッセージが削除されずにキューに積み重なると思いますか?
これらのメッセージを取得するフィルタリングされた IObservables を駆動するバッキング コレクションとして Subjects を調べた方が効率的でしょうか? このシステムのアーキテクチャに役立つ可能性のある、ここで欠けているものは他にありますか?