10

現在のバッチを処理している間に入ってくるすべてのアイテムが次のバッチに表示されるように、イベントを受け取ってバッチで処理したいプログラムがあります。

Rx の単純な TimeSpan およびカウント ベースの Buffer メソッドは、入ってきたすべてのものの 1 つの大きなバッチを提供する代わりに、アイテムの複数のバッチを提供します (サブスクライバーが指定された TimeSpan よりも長くかかる場合、または N 個を超えるアイテムが入ってきて、 N は count より大きい)。

Func<IObservable<TBufferClosing>>またはIObservable<TBufferOpening> と Func<TBufferOpening, IObservable<TBufferClosing>>を取るより複雑な Buffer オーバーロードの使用を調べましたが、これらの使用方法の例を見つけることができません。私がやろうとしていることにそれらを適用する方法。

4

4 に答える 4

2

これはあなたが望むことをしますか?

var xs = new Subject<int>();
var ys = new Subject<Unit>();

var zss =
    xs.Buffer(ys);

zss
    .ObserveOn(Scheduler.Default)
    .Subscribe(zs =>
    {
        Thread.Sleep(1000);
        Console.WriteLine(String.Join("-", zs));
        ys.OnNext(Unit.Default);
    });

ys.OnNext(Unit.Default);
xs.OnNext(1);
Thread.Sleep(200);
xs.OnNext(2);
Thread.Sleep(600);
xs.OnNext(3);
Thread.Sleep(400);
xs.OnNext(4);
Thread.Sleep(300);
xs.OnNext(5);
Thread.Sleep(900);
xs.OnNext(6);
Thread.Sleep(100);
xs.OnNext(7);
Thread.Sleep(1000);

私の結果:

1-2-3
4-5
6-7
于 2012-11-27T21:38:50.233 に答える
1

以前私がこれを行った方法は、DotPeek/Reflector で ObserveOn メソッドをプルアップし、そのキューイングの概念を採用して、要件に適合させることでした。たとえば、刻々と変化するデータ (金融など) が高速な UI アプリケーションでは、UI スレッドがイベントであふれ、十分な速さで更新できない場合があります。このような場合、(特定のインストゥルメントの) 最後のイベントを除くすべてのイベントを削除したいと考えています。この場合、ObserveOn の内部キューを単一の値 T に変更しました (ObserveLatestOn(IScheduler) を探します)。あなたの場合、キューが必要ですが、最初の値だけでなくキュー全体をプッシュしたいと考えています。これで始められるはずです。

于 2012-12-10T07:54:04.877 に答える
1

必要なのは、値をバッファリングするものであり、ワーカーの準備が整ったら、現在のバッファを要求してからリセットします。これは、RX とタスクの組み合わせで実行できます。

class TicTac<Stuff> {

    private TaskCompletionSource<List<Stuff>> Items = new TaskCompletionSource<List<Stuff>>();

    List<Stuff> in = new List<Stuff>();

    public void push(Stuff stuff){
        lock(this){
            if(in == null){
                in = new List<Stuff>();
                Items.SetResult(in);
            }
            in.Add(stuff);
        }
    }

    private void reset(){
        lock(this){
            Items = new TaskCompletionSource<List<Stuff>>();
            in = null;
        }
    }

    public async Task<List<Stuff>> Items(){
        List<Stuff> list = await Items.Task;
        reset();
        return list;
    }
}

それから

var tictac = new TicTac<double>();

IObservable<double> source = ....

source.Subscribe(x=>tictac.Push(x));

次に、ワーカーで

while(true){

    var items = await tictac.Items();

    Thread.Sleep(100);

    for each (item in items){
        Console.WriteLine(item);
    }

}
于 2012-11-28T09:04:26.200 に答える