1

私は Rx が大好きですが、常に問題に直面しています。

1 つのアップストリームシーケンスIObservable<Foo>Nダウンストリーム シーケンスが関連付けられているとします。それぞれは、いくつかの単純な述語 (たとえばfoo.bar == someKey) を満たす Foo にのみ関心があります。

もちろん、これはWhere()オペレーターにとって単純な仕事です。

IObservable<Foo> foos = ...;
foos.Where(foo => foo.bar == "abc").Subscribe(f => A(f));
foos.Where(foo => foo.bar == "xyz").Subscribe(f => B(f));
foos.Where(foo => foo.bar == "bla").Subscribe(f => C(f));
...
[many more subscriptions for different bar values]

ここで本質的に起こることは、Foo生成されたアップストリームごとに、Where()述語がその時点で評価されるということFoo Nです。これは、これを必要とするすべてのサブスクライバーを見つけるための線形検索のように機能しますFoo。それはすべてうまくいき、まさにWhere()ここでの使用に期待する (べき) ものです。

私が抱えている問題は、私の場合、N非常に大きい可能性がありますが、特定のものを必要とするサブスクライバーのサブセットFooが非常に小さいことです。通常、それぞれに 1 つだけ存在しますFooFooこれは、これも伝播する必要があるいくつかの下流シーケンスを見つけるために非常に効率的なルックアップを実行できるときに、本質的に低速の線形検索を実行していることを意味します。私のアプリは非常にパフォーマンスが重要な環境で実行されており、この非効率性を許容することはできません。

これをより効率的に行うエレガントな方法を見つけようと頭を悩ませましたが、多くの状態を保存し (サブスクライバーのマッピングなど)、並行性を非常に慎重に管理する必要があるソリューションしか思いつきません。そもそもRxを使用する目的の多く。既存のオペレーターに関して、これに対処する何らかの方法を希望します。誰かが以前にこの問題に対処したことがありますか、または良い解決策を知っていますか? 詳細をお知らせいただければ幸いです。

編集

私の例は少し単純すぎたと思います。既知の範囲内の数値と照合するケースは扱っていません。N は説明目的でのみ使用されました。上記の更新された例。

4

2 に答える 2

3

Codeplex の Rx ディスカッション ボードで、Dave Sexton から素晴らしいソリューションを入手しました。

https://rx.codeplex.com/discussions/439717

PublishでGroupByまたはGroupByUntilを使用するのはどうですか?

例: (未テスト)

IConnectableObservable<IGroupedObservable<string, Foo>> foosByBar = 
    (from foo in foos
     group foo by foo.bar)
    .Publish();

foosByBar.Where(g => g.Key == "abc").Take(1).SelectMany(g => g).Subscribe(A);
foosByBar.Where(g => g.Key == "xyz").Take(1).SelectMany(g => g).Subscribe(B);
foosByBar.Where(g => g.Key == "bla").Take(1).SelectMany(g => g).Subscribe(C);

foosByBar.Connect();

GroupByは、すべてのキーに対してディクショナリ ルックアップを使用して、値がプッシュされる適切なオブザーバブルを見つけます。

パブリッシュは group-by をブロードキャストして、ディクショナリ ルックアップ操作がすべてのオブザーバーで共有されるようにします。

Where / Takeは、述語を 1 回だけ実行して適切なグループを特定し、そのグループ内のすべての値のブロードキャストを、同じキーに関心のある他のオブザーバーと共に受信します。

GroupByはIGroupedObservableを再生しないため、接続する前にすべてのサブスクリプションを設定する必要があることに注意してください。ConnectよりもRefCountを使用したい場合は、 Replay演算子を GroupByの結果に適用することを検討する必要があります。

于 2013-04-10T15:44:21.853 に答える
0

何かが状態を保存しています。今は、Wheres を介して追加したすべてのサブスクライバーを保存している観察可能なものです。これを認識しているかどうかは明らかではありませんが、foos はすべてのメッセージで各オブザーバーに通知する必要があります。ほとんどのオブザーバーが単純に述語をチェックして戻るようにするだけですが、すべてWhereのメッセージについて各述語がチェックされます。

オブザーバーとしてラップされたハンドラーのマップを構築することはそれほど難しくなく、必要なパフォーマンスの向上が得られるはずです。必要な数のハンドラーを登録し、マップをソース オブザーバブルにサブスクライブするだけです。aDictionaryが必要な一致するセマンティクスを提供しない場合は、ルックアップを減らすために他のスキームを考え出す必要がありますが、全体的な考え方は同じです。処理する入力が複数ある場合は、同じハンドラーを複数回登録でき、同じ入力に対して複数のハンドラーを登録できることに注意してください。

class ObserverMap<T> : IObserver<T>
{
    ObserverMap(Action<Exception> onError, Action onCompleted)
    {
        _onError = onError;
        _onCompleted = onCompleted;
        _handlers = new Dictionary<T, List<Action<T>>>();
    }
    ObserverMap(Action<Exception> onError, Action onCompleted, IEqualityComparer<T> comparer) 
    {
        _onError = onError;
        _onCompleted = onCompleted;
        _handlers = new Dictionary<T, List<Action<T>>>(comparer);
    }

    int _stopped;
    Dictionary<T, List<Action<T>>> _handlers;
    Action<Exception> _onError;
    Action _onCompleted;

    public void OnCompleted()
    {
        if (System.Threading.Interlocked.Exchange(ref _stopped, 1) == 0)
        {
            if (_onCompleted != null) _onCompleted();
        }
    }

    public void OnError(Exception error)
    {
        if (System.Threading.Interlocked.Exchange(ref _stopped, 1) == 0)
        {
            if (_onCompleted != null) _onCompleted();
        }
    }

    public void OnNext(T value)
    {
        if (_stopped != 0) return;

        List<Action<T>> match;
        if (_handlers.TryGetValue(value, out match))
        {
            foreach (var handler in match)
            {
                handler(value);
            }
        }
    }

    public IDisposable RegisterHandler(T key, Action<T> handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");

        List<Action<T>> match;
        if (!_handlers.TryGetValue(key, out match))
        {
            match = new List<Action<T>>();
            _handlers.Add(key, match);
        }
        match.Add(handler);

        return System.Reactive.Disposables.Disposable.Create(() => match.Remove(handler));
    }
}
于 2013-04-09T22:52:06.883 に答える