2

リアクティブ拡張を使用すると、同じオブザーバブルに 2 回サブスクライブするのは簡単です。オブザーバブルで新しい値が利用可能になると、両方のサブスクライバーがこの同じ値で呼び出されます。

このオブザーバブルから各サブスクライバーに異なる値 (次の値) を取得させる方法はありますか?

私が求めているものの例:
ソース シーケンス: [1,2,3,4,5,...] (無限)
ソースは未知の速度で常に新しいアイテムを追加しています。
N サブスクライバーを使用して、アイテムごとに長時間の非同期アクションを実行しようとしています。

1 番目の加入者: 1,2,4,...
2 番目の加入者: 3,5,...
...
または
1 番目の加入者: 1,3,...
2 番目の加入者: 2,4,5,...
.. .
または
1 番目の加入者: 1,3,5,...
2 番目の加入者: 2,4,6,...

4

2 に答える 2

1

私はアスティに同意します。

Rx を使用してキュー (ブロッキング コレクション) にデータを入力し、競合するコンシューマーにキューから読み取らせることができます。このようにして、1 つのプロセスが何らかの理由でより高速である場合、まだビジーである場合、他のコンシューマーよりも前に次のアイテムを取得する可能性があります。

ただし、それを行いたい場合は、良いアドバイスに反して:)、各要素のインデックスを提供する Select 演算子を使用できます。その後、それをサブスクライバーに渡すことができ、サブスクライバーはモジュラスに合わせることができます。(うん! 漏れのある抽象化、マジック ナンバー、ブロッキングの可能性、ソース シーケンスへの潜在的な副作用など)

var source = Obserservable.Interval(1.Seconds())
  .Select((i,element)=>{new Index=i, Element=element});

var subscription1 = source.Where(x=>x.Index%2==0).Subscribe(x=>DoWithThing1(x.Element));
var subscription2 = source.Where(x=>x.Index%2==1).Subscribe(x=>DoWithThing2(x.Element));

また、ブロックしている場合に OnNext ハンドラーで行われた作業は、それがオンになっているスケジューラーを引き続きブロックすることにも注意してください。これは、ソース/プロデューサーの速度に影響を与える可能性があります。Asti の回答がより良い選択肢であるもう 1 つの理由です。

それが明確でない場合は尋ねてください:-)

于 2012-02-06T18:39:31.213 に答える
1

どうですか:

IObservable<TRet> SomeLengthyOperation(T input) 
{
    return Observable.Defer(() => Observable.Start(() => {
        return someCalculatedValueThatTookALongTime;
    }, Scheduler.TaskPoolScheduler));
}

someObservableSource
    .SelectMany(x => SomeLengthyOperation(input))
    .Subscribe(x => Console.WriteLine("The result was {0}", x);

同時操作の数を制限することもできます:

someObservableSource
    .Select(x => SomeLengthyOperation(input))
    .Merge(4 /* at a time */)
    .Subscribe(x => Console.WriteLine("The result was {0}", x);

Merge(4) が機能することは重要です。 SomeLengthyOperation によって返されるObservable がCold Observable であることが重要です。これは Defer がここで行うことです。これにより、誰かが購読するまで Observable.Start が発生しなくなります。

于 2012-02-07T21:02:02.373 に答える