3

アップデート

Jon Skeet が正しかったようです (大きな驚きです!)。問題は、Average拡張機能が継続的な平均を提供するという私の仮定にありました (そうではありません)。

私が求めている動作のために、単純なContinuousAverage拡張メソッドを作成しました。その実装は、同様のことを望む他の人のためにここに含めています。

public static class ObservableExtensions {
    private class ContinuousAverager {
            private double _mean;
            private long _count;

        public ContinuousAverager() {
            _mean = 0.0;
            _count = 0L;
        }

        // undecided whether this method needs to be made thread-safe or not
        // seems that ought to be the responsibility of the IObservable (?)
        public double Add(double value) {
            double delta = value - _mean;
            _mean += (delta / (double)(++_count));
            return _mean;
        }
    }

    public static IObservable<double> ContinousAverage(this IObservable<double> source) {
        var averager = new ContinuousAverager();

        return source.Select(x => averager.Add(x));
    }
}

私は先に進んで、他の明白な候補についても上記のようなことをすることを考えContinuousCountています. それについて何か考えはありますか?ContinuousSumContinuousMinContinuousMaxContinuousVarianceContinuousStandardDeviation


元の質問

私はRx Extensionsをあちこちで少し使用していますが、基本的な考え方は理解できたと感じています。

ここで奇妙なことがあります。これを書いた場合、次のような印象を受けました。

var ticks = Observable.FromEvent<QuoteEventArgs>(MarketDataProvider, "MarketTick");

var bids = ticks
    .Where(e => e.EventArgs.Quote.HasBid)
    .Select(e => e.EventArgs.Quote.Bid);

var bidsSubscription = bids.Subscribe(
    b => Console.WriteLine("Bid: {0}", b)
);

var avgOfBids = bids.Average();
var avgOfBidsSubscription = avgOfBids.Subscribe(
    b => Console.WriteLine("Avg Bid: {0}", b)
);

IObservable<double>2 つのオブジェクト (bidsavgOfBids)を取得します。1 つは基本的に私の からのすべての市場入札のストリームであり、もう 1 つはこれらの入札の平均MarketDataProviderのストリームです。

だから、このようなもの:

Bid    Avg Bid
1      1
2      1.5
1      1.33
2      1.5

私のavgOfBidsオブジェクトは何もしていないようです。私は何が欠けていますか?私はおそらくAverage実際に何をすべきかを誤解していると思います。IObservable<T>(これは、Max、などのすべての集約のような拡張メソッドにも当てはまるようですCount。)

4

3 に答える 3

3

ContinousAverage を行う別の方法は、.Scan() を使用することです。

bids.Scan(new { sum = .0, count = 0 }, 
          (agg, x) => new { sum = agg.sum + x, count = agg.count + 1 })
    .Select(agg => agg.sum / agg.count)
于 2010-05-09T05:41:35.257 に答える
2

The aggregate methods don't provide rolling max/min/count/average etc - they provide a single value when the original stream completes. So for example, if you change your code to:

var avgOfBids = bids.Take(5).Average();
var avgOfBidsSubscription = avgOfBids.Subscribe(
    b => Console.WriteLine("Avg Bid: {0}", b)
);

Then once 5 bids have been made, it will display the average of them. The "average" stream will then complete too.

于 2010-05-07T20:07:59.563 に答える
0

入札イベント ストリームをサブスクライブした後。他のオブザーバーのサブスクライブを効果的にブロックしました (Subscribe は IDisposable を返します)。

ティックと並行して別の Observable を定義し、それをサブスクライブして平均化する必要があります。

var ticksToAverage = Observable.FromEvent<QuoteEventArgs>(MarketDataProvider, "MarketTick");

 var bidsToAverage = ticksToAverage
      .Where(e => e.EventArgs.Quote.HasBid)
         .Select(e => e.EventArgs.Quote.Bid);


var avgOfBids = bidsToAverage.Average();
var avgOfBidsSubscription = avgOfBids.Subscribe( b => Console.WriteLine("Avg Bid: {0}", b)
                ); 
于 2010-05-07T20:06:21.063 に答える