アップデート
Jon Skeet が正しかったようです (大きな驚きです!)。問題は、Average
拡張機能が継続的な平均を提供するという私の仮定にありました (そうではありません)。
私が求めている動作のために、単純なContinuousAverage
拡張メソッドを作成しました。その実装は、同様のことを望む他の人のためにここに含めています。
public static class ObservableExtensions {
private class ContinuousAverager {
private double _mean;
private long _count;
public ContinuousAverager() {
_mean = 0.0;
_count = 0L;
}
// undecided whether this method needs to be made thread-safe or not
// seems that ought to be the responsibility of the IObservable (?)
public double Add(double value) {
double delta = value - _mean;
_mean += (delta / (double)(++_count));
return _mean;
}
}
public static IObservable<double> ContinousAverage(this IObservable<double> source) {
var averager = new ContinuousAverager();
return source.Select(x => averager.Add(x));
}
}
私は先に進んで、他の明白な候補についても上記のようなことをすることを考えContinuousCount
ています. それについて何か考えはありますか?ContinuousSum
ContinuousMin
ContinuousMax
ContinuousVariance
ContinuousStandardDeviation
元の質問
私はRx Extensionsをあちこちで少し使用していますが、基本的な考え方は理解できたと感じています。
ここで奇妙なことがあります。これを書いた場合、次のような印象を受けました。
var ticks = Observable.FromEvent<QuoteEventArgs>(MarketDataProvider, "MarketTick");
var bids = ticks
.Where(e => e.EventArgs.Quote.HasBid)
.Select(e => e.EventArgs.Quote.Bid);
var bidsSubscription = bids.Subscribe(
b => Console.WriteLine("Bid: {0}", b)
);
var avgOfBids = bids.Average();
var avgOfBidsSubscription = avgOfBids.Subscribe(
b => Console.WriteLine("Avg Bid: {0}", b)
);
IObservable<double>
2 つのオブジェクト (bids
とavgOfBids
)を取得します。1 つは基本的に私の からのすべての市場入札のストリームであり、もう 1 つはこれらの入札の平均MarketDataProvider
のストリームです。
だから、このようなもの:
Bid Avg Bid
1 1
2 1.5
1 1.33
2 1.5
私のavgOfBids
オブジェクトは何もしていないようです。私は何が欠けていますか?私はおそらくAverage
実際に何をすべきかを誤解していると思います。IObservable<T>
(これは、Max
、などのすべての集約のような拡張メソッドにも当てはまるようですCount
。)