3

Reactive Framework の使用方法を学び始めたばかりで、複数のサブスクライバーにマルチキャスト パブリッシュできることに苦労しています。

次のようにすべてが正常に機能していました。

m_MessagePublisher = m_ServerClient.MessageQueue
      .GetConsumingEnumerable()
      .ToObservable(TaskPoolScheduler.Default);

var genericServerMessageSubscriber = m_MessagePublisher
      .Where(message => message is GenericServerMessage)
      .Subscribe(message =>
      {
          // do something here
      }

しかし、これはマルチキャストをサポートしていないことに気付きました。別のサブスクライバーをアタッチしようとすると、同じメッセージがヒットするはずでしたが、起動しませんでした。私は .MultiCast 拡張機能を読み込んでいて、サブジェクトがこれにどのように関与するかを理解しようとしていますが、まだ機能させることができませんでした:

var subject = new Subject<BesiegedMessage>();

var messagePublisher = m_ServerClient.MessageQueue
      .GetConsumingEnumerable()
      .ToObservable(TaskPoolScheduler.Default)
      .Multicast(subject);

// All generic server messages are handled here
var genericServerMessageSubscriber = subject
      .Where(message => message is GenericServerMessage)
      .Subscribe(message =>
      {
            // do something here
      }

しかし今では、以前は問題なく機能していた 1 つのサブスクライバーを含め、どのサブスクライバーもヒットしていません。複数のサブスクライバーに適切にマルチキャストできるようにするために、ここで何が欠けていますか?

更新: Multicast(subject) の代わりに Subscribe(subject) を使用すると、マルチキャストに機能しているように見えるため、 .MultiCast() の目的について非常に混乱しています

4

1 に答える 1

2

編集:

ハハ-読むのが速すぎるのは私にぴったりです-あなたが求めていることはずっと簡単です...とはいえ、以下は重要だと思うので、そのままにしておきます...だから、あなたの問題-この行を追加してみてください:

var messagePublisher = m_ServerClient.MessageQueue
  .GetConsumingEnumerable()
  .ToObservable(TaskPoolScheduler.Default)
  .Multicast(subject)
  // Here: connectable observables are a PITA...
  .RefCount();

編集終了:

うーん...どのように説明するかMulticast...例を挙げてみましょう:

あなたがこのようなものを持っているとしましょう - それは何を生み出すと思いますか?

int delay = 100;
var source = Observable.Interval(TimeSpan.FromMilliseconds(delay));
var publishingFrontend = new Subject<string>();

// Here's "raw"
var rawStream = source;
using(rawStream.Subscribe(x => Console.WriteLine("{0}", x)))
{
    Thread.Sleep(delay * 3);
    using(rawStream.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
    {
        Thread.Sleep(delay * 3);
    }
    Thread.Sleep(delay * 5);
}

ストリーム raw をサブスクライブしているため、新しいサブスクライバーは基本的にゼロから始めます。

(私は でおかしな方法を取ったので、再実行しても 100% 一致しませんが、Thread.Sleep近いはずです)

0
1
2
Inner: 0
3
Inner: 1
4
5
6
7
8
9

うーん...つまり、「ストリームの途中で結びたい」場合は、次のPublish().RefCount()パターンを使用します。

var singleSource = source.Publish().RefCount();
using(singleSource.Subscribe(x => Console.WriteLine("{0}", x)))
{
    Thread.Sleep(delay * 3);
    using(singleSource.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
    {
        Thread.Sleep(delay * 3);
    }
    Thread.Sleep(delay * 5);
}

次のようなものが生成されます。

0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9

では、演算子がないとしましょう。Publish()どのようにシミュレートできますか?

Console.WriteLine("Simulated Publish:");
// use a subject to proxy values...
var innerSubject = new Subject<long>();
// wire up the source to "write to" the subject
var innerSub = source.Subscribe(innerSubject);
var simulatedSingleSource = Observable.Create<long>(obs =>
{
    // return subscriptions to the "proxied" subject
    var publishPoint = innerSubject.Subscribe(obs);        
    return publishPoint;
});

これを実行すると、次のようになります。

Simulated Publish:
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9

ウット!

しかし、別の方法があります...

Console.WriteLine("MulticastPublish:");
var multicastPublish = source.Multicast(new Subject<long>()).RefCount();    
using(multicastPublish.Subscribe(x => Console.WriteLine("{0}", x)))
{
    Thread.Sleep(delay * 3);
    using(multicastPublish.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
    {
        Thread.Sleep(delay * 3);
    }
    Thread.Sleep(delay * 5);
}

出力:

MulticastPublish:
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9

編集:

実際、生成するすべての拡張機能は/ペアリングConnectableObservableに依存しています。MulticastSubject

Publish() => Multicast(new Subject<T>)
Replay() => Multicast(new ReplaySubject<T>)
PublishLast() => Multicast(new AsyncSubject<T>)
于 2013-04-03T01:35:31.000 に答える