0

RXを使用するときに使用可能な演算子を拡張する適切な方法は何ですか?

役に立つと思う操作をいくつか作り上げたいと思います。

最初の操作は、単に系列の標準偏差です。

2番目の操作はn番目のラグ値です。つまり、2が遅れており、シリーズがABCDEFの場合、FがプッシュされるとラグはDになり、Aがプッシュされるとラグはヌル/空になります。Bがプッシュされるとラグはヌル/空になります。 Cが押されると、ラグはAになります

これらのタイプの演算子をrx.codeplex.comの組み込みに基づいて作成するのは理にかなっていますか、それとももっと簡単な方法がありますか?

4

3 に答える 3

1

これらのいくつかは(いつものように)他のものより簡単です。(時間ではなく)カウントによる「ラグ」の場合、「ラグ」のサイズに相当するObservable.Bufferを使用してスライディングウィンドウを作成し、結果リストの最初の要素を取得します。

これまでのところ、ラグ= 3、関数は次のとおりです。

obs.Buffer(3,1).Select(l => l.[0])

これは、拡張関数に変換するのは非常に簡単です。同じリストを再利用するという点で効率的かどうかはわかりませんが、ほとんどの場合、それは問題ではありません。F#が必要なことはわかっていますが、翻訳は簡単です。

実行中の集計の場合、通常はObservable.Scan「実行中」の値を取得するために使用できます。これは、これまでに見られたすべての値に基づいて計算されます(実装は非常に簡単です)。つまり、後続の各要素を実装する必要があるのは、前の集計と新しい要素だけです。

何らかの理由でスライディングウィンドウに基づく実行中のアグリゲートが必要な場合は、より困難なドメインに入ります。ここでは、最初にスライディングウィンドウを提供できる操作が必要です。これは上記のBufferで説明されています。ただし、このウィンドウから削除された値と追加された値を知る必要があります。

そのため、既存のウィンドウ+新しい値に基づいて内部ウィンドウを維持し、新しいウィンドウ+削除された値+付加価値を返す新しいObservable関数をお勧めします。これは、Observable.Scanを使用して記述できます(効率的な実装のために内部キューをお勧めします)。新しい値を指定して削除する値を決定する関数を使用する必要があります(このようにして、時間またはカウントでスライドするようにパラメーター化できます)。

その時点で、Observable.Scanを再度使用して、古い集計+ウィンドウ+削除された値+付加価値を取得し、新しい集計を提供できます。

これがお役に立てば幸いです、私はそれがたくさんの言葉であることを理解しています。要件を確認できれば、その特定のユースケースの実際の拡張方法をサポートできます。

于 2012-12-06T06:32:10.000 に答える
1

慣用的なRxでは、任意の遅延はで構成できますZip

let lag (count : int) o = 
    let someo = Observable.map Some o
    let delayed = Observable.Repeat(None, count).Concat(someo)        
    Observable.Zip(someo, delayed, (fun c d -> d))    

ローリングバッファの場合、最も効率的な方法は、固定サイズのQueue/を使用することです。ResizeArray

let rollingBuffer (size : int) o = 
    Observable.Create(fun (observer : IObserver<_>) -> 
    let buffer = new Queue<_>(size)
    o |> Observable.subscribe(fun v -> 
            buffer.Enqueue(v)
            if buffer.Count = size then
                observer.OnNext(buffer.ToArray())
                buffer.Dequeue() |> ignore
        )
    )

の場合numbers |> rollingBuffer 3 |> log

seq [0L; 1L; 2L]
seq [1L; 2L; 3L]
seq [2L; 3L; 4L]
seq [3L; 4L; 5L]
...

隣接する値をペアリングするには、次を使用できます。Observable.pairwise

let delta (a, b) = b - a
let deltaStream = numbers |>  Observable.pairwise |> Observable.map(delta) 

Observable.Scanローリング計算を適用する場合は、より簡潔になります。

于 2012-12-06T17:30:33.570 に答える
0

の場合lag、次のようなことができます

module Observable =
  let lag n obs =
    let buf = System.Collections.Generic.Queue()
    obs |> Observable.map (fun x ->
      buf.Enqueue(x)
      if buf.Count > n then Some(buf.Dequeue())
      else None)

これ:

Observable.Range(1, 9) 
  |> Observable.lag 2 
  |> Observable.subscribe (printfn "%A") 
  |> ignore

プリント:

<null>
<null>
Some 1
Some 2
Some 3
Some 4
Some 5
Some 6
Some 7
于 2012-12-06T15:22:11.867 に答える