2

ブロッキングの問題が発生しています。プッシュからプルに移行しようとしています。つまり、ここでデータにアクセスしたいと思います。この場合は、Observableで処理された後の配列です。

type HistoryBar = 
    {Open: decimal; High: decimal; Low: decimal; Close: decimal; Time: DateTime; Volume: int; RequestId: int; Index: int; Total: int}

let transformBar =
    client.HistoricalData
    |> Observable.map(fun args ->
        {
            Open = args.Open
            High = args.High
            Low =  args.Low
            Close = args.Close
            Time = args.Date
            Volume = args.Volume
            RequestId = args.RequestId
            Index = args.RecordNumber
            Total = args.RecordTotal
       }
    )

let groupByRequest (obs:IObservable<HistoryBar>) = 
    let bars = obs.GroupByUntil((fun x -> x.RequestId), (fun x -> x.Where(fun y -> y.Index = y.Total - 1)))
    bars.SelectMany(fun (x:IGroupedObservable<int, HistoryBar>) -> x.ToArray())

let obs = transformBar |> groupByRequest

client.RequestHistoricalData(1, sym, DateTime.Now, TimeSpan.FromDays(10.0), BarSize.OneDay, HistoricalDataType.Midpoint, 0)

私がobsを購読している場合、私が電話をかけるとすぐにclient.RequestHistoricalDataすべてがうまくいきます。私がやりたいのは、obsを基になるタイプ(この場合は)に変換することですHistoryBar []。運が悪かったので使っwaitてみました。ToEnumberable最後に作成したデータを引き出すための適切なアプローチは何ですか?

ライブラリが通常どのように機能するかを示すために、不自然なC#サンプルコードを追加して編集します。ここで私が本当に理解しようとしているのは、監視可能から標準のリストまたは配列に移行する方法です。私が確信していないのは、そうするために可変構造が必要かどうかです。私が推測しなければならないとしたら、私はノーと言うでしょう。

static void Main(string[] args)
{
...
client.HistoricalData += client_HistoricalData;
client.RequestHistoricalData(1, sym, DateTime.Today, TimeSpan.FromDays(10), BarSize.OneDay, HistoricalDataType.Midpoint, 0);
....
}

static void client_HistoricalData(object sender, HistoricalDataEventArgs e)
{

Console.WriteLine("Open: {0}, High: {1}, Low: {2}, Close: {3}, Date: {4}, RecordId: {5}, RecordIndex: {6}", e.Open, e.High, e.Low, e.Close, e.Date, e.RequestId, e.RecordNumber);
}
4

1 に答える 1

2

この質問では、最初にデータがどのようにロードされるか(レイジー/時変など)が明確になっていないため、時間制限のある値のストリームであると想定します。

コードから、ストリームが完了したときにストリームの最後の値を見つけたいようです。このLastメソッドは、完了時にプッシュされたストリームの最後の値を提供します。ただし、これは同期的であり、ストリームが完了するまでブロックされます。非ブロッキングバージョンはLastAsync、ソースが完了したときに値を生成するObservableを返します。

let from0To4 = 
    Observable.Interval(TimeSpan.FromSeconds(0.1)).Take(5)

let lastValue =
    from0To4.LastAsync()

let disposable =
    lastValue |> Observable.subscribe(log)

部分的にブロックせずにObservableをリストに変換するには、Bufferメソッドを使用できます。Observableが完了するまですべての値をバッファリングするには、を使用しますToList

let fullBuffer =
    from0To4.ToList()

let disposable =
    fullBuffer |> Observable.subscribe(fun ls -> printfn "Buffer(%d): %A" ls.Count ls)

出力:

Buffer(5):seq [0L; 1L; 2L; 3L; ...]

于 2012-12-02T19:33:21.790 に答える