「排他性」の方が簡単です。排他的オブザーバーのフィルター処理された出力を他のすべての人にサブスクライブさせるだけです。
「デフォルト」はより難しい-RXプログラミングは関数型プログラミングであり、サブスクライバーはお互いを知りませんが、定義上、「デフォルト」サブスクライバーを持つことは、オブザーバー間で共有される状態を持つことを意味します。状態を共有する1つの方法は、TPLDataFlowのConcurrentBagまたはBufferBlockを使用してプロデューサー/コンシューマーキューを作成することです。もう1つの方法は、次のようなクラスを使用して、「処理済み」状態をイベント自体にアタッチすることです。
public class Handled<T>
{
public bool IsHandled { get; set; }
public T Data { get; set; }
}
いずれの場合も、「デフォルト」ハンドラーを使用する前に、オブザーバーに反応する時間を与える必要があります。以下のコードは、「排他的」と「デフォルト」の両方の概念を示しています。
var source = new[] {0, 1, 2, 3, 4}.ToObservable();
var afterExclusive = source
.Where(x =>
{
if (x == 0)
{
Console.WriteLine("exclusive");
return false;
}
return true;
})
.Select(x => new Handled<int> {Data = x})
.Publish(); // publish is a must otherwise
afterExclusive // we'll get non shared objects
.Do(x => { x.IsHandled = true; })
.Subscribe();
afterExclusive
.Delay(TimeSpan.FromSeconds(1))
.Where(x => !x.IsHandled)
.Subscribe(x => Console.WriteLine("missed by all {0}", x));
afterExclusive.Connect();