7

ここに示す RX ビルダーのわずかに変更されたバージョンを使用しています。

http://mnajder.blogspot.com/2011/09/when-reactive-framework-meets-f-30.html

直接取得するのではなく、IObservable<'T>私の計算式には次のタイプがあります。

type MyType<'a,'b> = MyType of (IObservable<'a> -> IObservable<'b>)

let extract (MyType t) = t

コンビネータは次の形式を取ります。

let where (f: 'b -> bool) (m:MyType<_,'b>) = MyType(fun input -> (extract m input).Where(f))

式自体の中で、ストリームに供給された以前の値を参照する必要があることがよくあります。そのために、最新の値MyTypeのローリング不変リストを維持する を定義しました。n

let history n = 
    MyType(fun input ->
        Observable.Create(fun (o:IObserver<_>) ->
            let buffer = new History<_>(n)
            o.OnNext(HistoryReadOnly(buffer))
            input.Subscribe(buffer.Push, o.OnError, o.OnCompleted)
        )
    )

これで、次のようなことができるようになりました。

let f = obs {
    let! history = history 20
    // Run some other types, and possibly do something with history
}

私はこの履歴をかなり頻繁に使用していることに気付きました。理想的には、これを に直接埋め込みたいと思いIObservable<'a>ます。明らかに、私はそれを行うことはできません。だから私の質問は、私がここに持っているこの歴史の概念を導入する合理的な方法は何かということです. 拡張する必要がありIObservable<'T>ますか (その方法がわからない) IObservable<'T>、?

提案をいただければ幸いです。

編集: 完全なサンプル コードを追加しました。

open System
open System.Collections.Generic
open System.Reactive.Subjects
open System.Reactive.Linq

// Container function
type MyType<'a,'b> = MyType of (IObservable<'a> -> IObservable<'b>)
let extract (MyType t) = t

// Mini Builder
let internal mbind (myTypeB:MyType<'a,'b>) (f:'b -> MyType<'a,'c>) = 
    MyType(fun input ->
        let obsB = extract myTypeB input
        let myTypeC= fun resB -> extract (f resB) input
        obsB.SelectMany(myTypeC)
    )

type MyTypeBuilder() = 
    member x.Bind (m,f) = mbind m f
    member x.Combine (a,b) = MyType(fun input -> (extract a input).Concat(extract b input))
    member x.Yield (r) = MyType(fun input -> Observable.Return(r))
    member x.YieldFrom (m:MyType<_,_>) = m
    member x.Zero() = MyType(fun input -> Observable.Empty())
    member x.Delay(f:unit -> MyType<'a,'b>) = f() 

let mtypeBuilder = new MyTypeBuilder()

// Combinators
let simplehistory = 
    MyType(fun input ->
        Observable.Create(fun (o:IObserver<_>) ->
            let buffer = new List<_>()
            o.OnNext(buffer)
            input.Subscribe(buffer.Add, o.OnError, o.OnCompleted)
        )
    )

let where (f: 'b -> bool) m = MyType(fun input -> (extract m input).Where(f))
let take (n:int) m = MyType(fun input -> (extract m input).Take(n))
let buffer m = MyType(fun input -> (extract m input).Buffer(1))
let stream = MyType(id)

// Example
let myTypeResult (t:MyType<'a,'b>) (input:'a[]) = (extract t (input.ToObservable().Publish().RefCount())).ToArray().Single()

let dat = [|1 .. 20|]

let example = mtypeBuilder {
    let! history = simplehistory
    let! someEven = stream |> where(fun v -> v % 2 = 0) // Foreach Even
    let! firstValAfterPrevMatch = stream |> take 1 // Potentially where a buffer operation would run, all values here are after i.e. we cant get values before last match
    let! odd = stream |> where(fun v -> v % 2 = 1) |> take 2 // Take 2 odds that follow it
    yield (history.[history.Count - 1], history.[0], someEven,firstValAfterPrevMatch, odd) // Return the last visited item in our stream, the very first item, an even, the first value after the even and an odd
}

let result = myTypeResult example dat

val result : (int * int * int * int * int) [] =
  [|(5, 1, 2, 3, 5); (7, 1, 2, 3, 7); (7, 1, 4, 5, 7); (9, 1, 4, 5, 9);
    (9, 1, 6, 7, 9); (11, 1, 6, 7, 11); (11, 1, 8, 9, 11); (13, 1, 8, 9, 13);
    (13, 1, 10, 11, 13); (15, 1, 10, 11, 15); (15, 1, 12, 13, 15);
    (17, 1, 12, 13, 17); (17, 1, 14, 15, 17); (19, 1, 14, 15, 19);
    (19, 1, 16, 17, 19)|]
4

3 に答える 3

1

Scan申し訳ありませんが、私の F# は非常に錆びていますが、演算子を探しているのかもしれません。ソースが生成する値をアキュムレータにプッシュし、このアキュムレータを使用してプロジェクションの次の値を生成できます。

ここでは (C# では申し訳ありませんが)、これらの値を 100 ミリ秒間隔で生成する [0..10] のシーケンスを取得し、実行中の Sum を返します。

var source = Observable.Interval(TimeSpan.FromMilliseconds(100))
                       .Take(10);

source.Scan(
    new List<long>(),               //Seed value
    (acc, value)=>                  //Accumulator function
        {
            acc.Add(value);
            return acc;
        }
    )
    .Select(accumate=>accumate.Sum())

値 [0,1,3,6,10,15,21,28,36,45] を 100 ミリ秒間隔で生成します。

このツールを使用すると、値の履歴を (履歴/アキュムレータに追加することで) 管理し、選択でこの履歴を使用して適切な値を投影できると思います。

于 2013-08-08T12:51:23.410 に答える