11

F# Computation Expression 構文内で Reactive Extension を使用する Rx Builder を考え出そうとしています。スタックを吹き飛ばさないように修正するにはどうすればよいですか? 以下の Seq の例のように。また、Reactive Extensions の一部として、または .NET Framework の将来のバージョンの一部として、RxBuilder の実装を提供する計画はありますか?

open System
open System.Linq
open System.Reactive.Linq

type rxBuilder() =    
    member this.Delay f = Observable.Defer f
    member this.Combine (xs: IObservable<_>, ys : IObservable<_>) = 
        Observable.merge xs ys      
    member this.Yield x = Observable.Return x
    member this.YieldFrom (xs:IObservable<_>) = xs

let rx = rxBuilder()

let rec f x = seq { yield x 
                    yield! f (x + 1) }

let rec g x = rx { yield x 
                    yield! g (x + 1) }


//do f 5 |> Seq.iter (printfn "%A")

do g 5 |> Observable.subscribe (printfn "%A") |> ignore

do System.Console.ReadLine() |> ignore
4

5 に答える 5

9

簡単に言うと、Rx Frameworkは、このような再帰パターンを使用したオブザーバブルの生成をサポートしていないため、簡単に実行することはできません。F#シーケンスに使用されるCombine操作には、オブザーバブルが提供しない特別な処理が必要です。Observable.GenerateRxフレームワークは、おそらく、を使用してオブザーバブルを生成し、LINQクエリ/ F#計算ビルダーを使用してそれらを処理することを想定しています。

とにかく、ここにいくつかの考えがあります-

まず、に置き換える必要がありObservable.mergeますObservable.Concat。最初のものは両方のオブザーバブルを並行して実行し、2番目のものは最初に最初のオブザーバブルからすべての値を生成し、次に2番目のオブザーバブルから値を生成します。この変更後、スタックがオーバーフローする前に、スニペットは少なくとも800個までの数値を出力します。

スタックオーバーフローの理由は、Concat呼び出すオブザーバブルConcatを作成して、呼び出す別のオブザーバブルConcatなどを作成するためです。これを解決する1つの方法は、同期を追加することです。Windowsフォームを使用している場合はDelay、GUIスレッドで監視対象をスケジュールするように変更できます(現在のスタックは破棄されます)。これがスケッチです:

type RxBuilder() =   
  member this.Delay f = 
      let sync = System.Threading.SynchronizationContext.Current 
      let res = Observable.Defer f
      { new IObservable<_> with
          member x.Subscribe(a) = 
            sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
            // Note: This is wrong, but we cannot easily get the IDisposable here
            null }
  member this.Combine (xs, ys) = Observable.Concat(xs, ys)
  member this.Yield x = Observable.Return x
  member this.YieldFrom (xs:IObservable<_>) = xs

これを適切に実装するには、独自のConcatメソッドを作成する必要がありますが、これは非常に複雑です。アイデアは次のようになります。

  • Concatはいくつかの特別なタイプを返します。IConcatenatedObservable
  • メソッドが再帰的に呼び出されると、IConcatenatedObservableその参照のチェーンが相互に作成されます
  • このConcatメソッドはこのチェーンを検索し、たとえば3つのオブジェクトがある場合、真ん中のオブジェクトをドロップします(常に最大2の長さのチェーンを維持するため)。

これはStackOverflowの回答には少し複雑すぎますが、Rxチームにとっては有益なフィードバックになる可能性があります。

于 2011-05-28T19:35:29.940 に答える
9

これは Rx v2.0 で修正されていることに注意してください (既に説明したように)。より一般的には、すべてのシーケンス演算子 (Concat、Catch、OnErrorResumeNext) と命令型演算子 (If、While など) について修正されています。

基本的に、このクラスのオペレーターは、ターミナル オブザーバー メッセージで別のシーケンスをサブスクライブするものと考えることができます (たとえば、現在のシーケンスの OnCompleted メッセージを受信すると、Concat は次のシーケンスをサブスクライブします)。ここで、末尾再帰の類推が役立ちます。

Rx v2.0 では、すべての末尾再帰サブスクリプションがキューのようなデータ構造にフラット化され、一度に 1 つずつ処理され、ダウンストリーム オブザーバーと通信します。これにより、連続するシーケンス サブスクリプションでオブザーバーが互いに話し合う際の無限の成長が回避されます。

于 2012-09-05T17:00:55.657 に答える
4

これは、Rx2.0Betaで修正されています。そして、これがテストです。

于 2011-06-07T19:49:58.780 に答える
3

このようなものはどうですか?

type rxBuilder() =    
   member this.Delay (f : unit -> 'a IObservable) = 
               { new IObservable<_> with
                    member this.Subscribe obv = (f()).Subscribe obv }
   member this.Combine (xs:'a IObservable, ys: 'a IObservable) =
               { new IObservable<_> with
                    member this.Subscribe obv = xs.Subscribe obv ; 
                                                ys.Subscribe obv }
   member this.Yield x = Observable.Return x
   member this.YieldFrom xs = xs

let rx = rxBuilder()

let rec f x = rx { yield x 
                   yield! f (x + 1) }

do f 5 |> Observable.subscribe (fun x -> Console.WriteLine x) |> ignore

do System.Console.ReadLine() |> ignore

http://rxbuilder.codeplex.com/ (RxBuilder の実験用に作成)

xs ディスポーザブルは配線されていません。使い捨てを配線しようとするとすぐに、スタックを爆破することに戻ります。

于 2011-06-01T00:01:53.460 に答える
2

この計算式 (別名 Monad) から構文糖衣を取り除くと、次のようになります。

let rec g x = Observable.Defer (fun () -> Observable.merge(Observable.Return x, g (x + 1) )

または C# の場合:

public static IObservable<int> g(int x)
{
    return Observable.Defer<int>(() =>
    {
      return Observable.Merge(Observable.Return(x), g(x + 1));                    
    });
}

これは間違いなく末尾再帰ではありません。末尾再帰にすることができれば、おそらく問題は解決すると思います

于 2011-05-29T08:35:39.707 に答える