慣用的なRxでは、任意の遅延はで構成できますZip
。
let lag (count : int) o =
let someo = Observable.map Some o
let delayed = Observable.Repeat(None, count).Concat(someo)
Observable.Zip(someo, delayed, (fun c d -> d))
ローリングバッファの場合、最も効率的な方法は、固定サイズのQueue
/を使用することです。ResizeArray
let rollingBuffer (size : int) o =
Observable.Create(fun (observer : IObserver<_>) ->
let buffer = new Queue<_>(size)
o |> Observable.subscribe(fun v ->
buffer.Enqueue(v)
if buffer.Count = size then
observer.OnNext(buffer.ToArray())
buffer.Dequeue() |> ignore
)
)
の場合numbers |> rollingBuffer 3 |> log
:
seq [0L; 1L; 2L]
seq [1L; 2L; 3L]
seq [2L; 3L; 4L]
seq [3L; 4L; 5L]
...
隣接する値をペアリングするには、次を使用できます。Observable.pairwise
let delta (a, b) = b - a
let deltaStream = numbers |> Observable.pairwise |> Observable.map(delta)
Observable.Scan
ローリング計算を適用する場合は、より簡潔になります。