2

監視可能な一連のイベントオブジェクトと、特定のタイプのイベントを処理する多数のオブザーバーがあります。次のシナリオを実行する必要があります。

  1. 一部のイベントタイプは、条件に一致する最初のオブザーバーによって処理される必要があり(たとえば、observable.SubscribeExclusively(x => {})、他のイベントタイプに対して「監視不能」になります。
  2. サブスクリプションがない場合は、アイテムが失われないようにデフォルトのハンドラー(たとえば、observable.SubscribeIfNoSubscriptions(x => {}))を設定します(このハンドラーは、たとえば、後で処理されるようにアイテムをデータベースに保存する場合があります)。

Rxでこれらのケースを処理する方法はありますか?

4

2 に答える 2

1

「排他性」の方が簡単です。排他的オブザーバーのフィルター処理された出力を他のすべての人にサブスクライブさせるだけです。

「デフォルト」はより難しい-RXプログラミングは関数型プログラミングであり、サブスクライバーはお互いを知りませんが、定義上、「デフォルト」サブスクライバーを持つことは、オブザーバー間で共有される状態を持つことを意味します。状態を共有する1つの方法は、TPLDataFlowのConcurrentBagまたはBufferBlock使用してプロデューサー/コンシューマーキューを作成することです。もう1つの方法は、次のようなクラスを使用して、「処理済み」状態をイベント自体にアタッチすることです。

public class Handled<T>
{
    public bool IsHandled { get; set; }
    public T Data { get; set; }
}

いずれの場合も、「デフォルト」ハンドラーを使用する前に、オブザーバーに反応する時間を与える必要があります。以下のコードは、「排他的」と「デフォルト」の両方の概念を示しています。

        var source = new[] {0, 1, 2, 3, 4}.ToObservable();
        var afterExclusive = source
            .Where(x =>
                       {
                           if (x == 0)
                           {
                               Console.WriteLine("exclusive");
                               return false;
                           }
                           return true;
                       })
            .Select(x => new Handled<int> {Data = x})
            .Publish(); // publish is a must otherwise 
        afterExclusive  // we'll get non shared objects
            .Do(x => { x.IsHandled = true; })
            .Subscribe();
        afterExclusive
            .Delay(TimeSpan.FromSeconds(1))
            .Where(x => !x.IsHandled)
            .Subscribe(x => Console.WriteLine("missed by all {0}", x));
        afterExclusive.Connect(); 
于 2011-08-03T04:32:01.400 に答える
0

私はあなたのシナリオを完全に理解したかどうかはわかりませんが、これはどのようにあなたを襲いますか?

IObservable<Event> streamOfEvents.SelectMany(x => {
    if (matchesExclusiveItem1(x)) {
        x += exclusiveItem1Handler;
        return Observable.Empty<Event>();
    }

    // Default case
    return Observable.Return(x);
}).Subscribe(x => {
    // Default case
    x += defaultHandler;
});

「イベントオブジェクト」を使用しているのは、それが指定したものだからですが、おそらく使用したほうがよいでしょう。IObservable<IObservable<T>>このセレクターには副作用(イベントの接続)があり、これはあまり良くありません。

于 2011-07-22T20:16:02.143 に答える