ReactiveExtensionsは初めてです。
私は部屋ベースのMMOFPSゲームサーバーを持っており、多くの部屋と1つのリスニングソケットしかありません。ネットワークから受信したメッセージを表すコールドオブザーバブルを作成し、それをホットに変換して複数の部屋で共有し、各部屋が関連するメッセージをフィルタリングして処理できるようにします。私のアプローチは正しいですか?
もう1つの問題は、コールドオブザーバブルをホットに変換した後、SubscribeOnがその効果を失ったことに気づきました。例えば:
var observable = Observable.Return(1).Publish();
observable
.Where(
x =>
{
Console.WriteLine("Filter on {0}", Thread.CurrentThread.ManagedThreadId);
return true;
})
.SubscribeOn(Scheduler.NewThread)
.ObserveOn(Scheduler.NewThread)
.Subscribe(x => Console.WriteLine("Received on {0}", Thread.CurrentThread.ManagedThreadId));
observable.Connect();
Console.WriteLine("End {0}", Thread.CurrentThread.ManagedThreadId);
Console.ReadLine();
結果:
12で受信End1010で
フィルター
公開なし:
結果:12で受信した11で10のフィルターを終了
しかし、Publish.RefCountを使用して自動接続すると、期待どおりに機能します。
私は何かが足りないのですか?..。