11

Rxは私たちのドメインに適しているように思われるので試していますが、学習曲線に驚かされました。

過去の価格データと実際の価格データを組み合わせる必要があります。

私はこれを行うための通常のアプローチをRxの言語に適応させようとしています:

  1. すぐにライブ価格を購読し、私が戻ってきた値のバッファリングを開始します
  2. 過去の価格データのリクエストを開始します(これは、ライブ価格のサブスクリプション後に発生する必要があるため、データにギャップはありません)
  3. 戻ってきたときに過去の価格を公開する
  4. すべての履歴データを受け取ったら、バッファリングされたライブデータを公開し、最初に履歴データと重複する値を削除します
  5. ライブ価格フィードからのデータの再生を続行します

私はこの嫌な間違ったストローマンコードを持っていますが、これは私が書いた素朴なテストケースで機能するようです:

IConnectableObservable<Tick> live = liveService
    .For(symbol)
    .Replay(/* Some appropriate buffer size */);
live.Connect();

IObservable<Tick> historical = historyService.For(since, symbol);

return new[] {historical, live}
    .Concat()
    .Where(TicksAreInChronologicalOrder());

private static Func1<Tick,bool> TicksAreInChronologicalOrder()
{
    // Some stateful predicate comparing the timestamp of this tick 
    // to the timestamp of the last tick we saw
}

これにはいくつかの欠点があります

  1. 適切な再生バッファサイズは不明です。無制限のバッファを設定することはできません-これは長時間実行されるシーケンスです。本当に、Subscribeへの最初の呼び出しでフラッシュするある種のワンタイムバッファが必要です。これがRxに存在する場合、私はそれを見つけることができません。
  2. ライブ価格の公開に切り替えた後も、リプレイバッファは引き続き存在します。この時点ではバッファは必要ありません。
  3. 同様に、過去の価格と実際の価格の間の最初の重複をスキップした後は、重複するティックを除外するための述語は必要ありません。私は本当に次のようなことをしたいです:live.SkipWhile(tick => tick.Timestamp < /* lazily get last timestamp in historical data */)Wait(this IObservable<TSource>)ここで役に立ちますか?

これを行うためのより良い方法があるはずですが、私はまだ私の脳がFPのようにRxを呟くのを待っています。

1.解決することを検討した別のオプションはISubject、最初のサブスクライバーを取得するまでメッセージをキューに入れる(その後、サブスクライバーを拒否する)独自のRx拡張機能を作成することです。多分それは行く方法ですか?

4

4 に答える 4

1

履歴データとライブデータの両方が時間ベースまたはスケジュールベースである場合、つまり、イベントストリームは時間の経過とともに次のようになります。

|---------------------------------------------------->  time
    h   h   h   h  h  h                                 historical
                l  l  l  l  l  l                        live

単純なTakeUntil構成を使用できます。

var historicalStream = <fetch historical data>;
var liveStream = <fetch live data>;

var mergedWithoutOverlap = 
     // pull from historical
     historicalStream
       // until we start overlapping with live
       .TakeUntil(liveStream)
       // then continue with live data
       .Concat(liveStream);

のように、すべての履歴データを一度に取得する場合は、と他のロジックをIEnumerable<T>組み合わせて使用​​できます。StartWith

var historicalData = <get IEnumerable of tick data>;
var liveData = <get IObservable of tick data>;

var mergedWithOverlap = 
    // the observable is the "long running" feed
    liveData
    // But we'll inject the historical data in front of it
    .StartWith(historicalData)
    // Perform filtering based on your needs
    .Where( .... );
于 2013-02-13T18:31:52.840 に答える
1

次のようなものはどうですか?

public static IObservable<T> CombineWithHistory<T, TSelectorResult>(this IObservable<T> live, IObservable<T> history, Func<T, TSelectorResult> selector)
{
    var replaySubject = new ReplaySubject<T>();
    live.Subscribe(replaySubject);
    return history.Concat(replaySubject).Distinct(selector);
}

これは、シーケンスIDとdistinctを使用して、重複をフィルタリングします。

そして対応するテスト:

var testScheduler = new TestScheduler();

var history = testScheduler.CreateColdObservable(
    OnNext(1L, new PriceTick { PriceId = 1 }),
    OnNext(2L, new PriceTick { PriceId = 2 }),
    OnNext(3L, new PriceTick { PriceId = 3 }),
    OnNext(4L, new PriceTick { PriceId = 4 }),
    OnCompleted(new PriceTick(), 5L));

var live = testScheduler.CreateHotObservable(
    OnNext(1L, new PriceTick { PriceId = 3 }),
    OnNext(2L, new PriceTick { PriceId = 4 }),
    OnNext(3L, new PriceTick { PriceId = 5 }),
    OnNext(4L, new PriceTick { PriceId = 6 }),
    OnNext(5L, new PriceTick { PriceId = 7 }),
    OnNext(6L, new PriceTick { PriceId = 8 }),
    OnNext(7L, new PriceTick { PriceId = 9 })
    );


live.Subscribe(pt => Console.WriteLine("Live {0}", pt.PriceId));
history.Subscribe(pt => Console.WriteLine("Hist {0}", pt.PriceId), () => Console.WriteLine("C"));

var combined = live.CombineWithHistory(history, t => t.PriceId);

combined.Subscribe(pt => Console.WriteLine("Combined {0}", pt.PriceId));

testScheduler.AdvanceTo(6L);

このテストを実行すると、combinedはIDが1から8の価格ティックを発行します。

于 2013-03-28T00:02:25.560 に答える
1

記録のために、これが私が最後にしたことです。私はまだRxの学習者であり、バージョン2.0で最後に見た.Netに戻ります。すべてのフィードバックは非常にありがたく受け取られます。

以下で使用されるTicksオブジェクトには、1つ以上のティック値が含まれている場合があります。履歴データサービスは、いくつかのティックでデータを返します。

public class HistoricalAndLivePriceFeed : IPriceFeed
{
    private readonly IPriceFeed history;
    private readonly IPriceFeed live;
    private readonly IClock clock;

    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live)
:            this(history, live, new RealClock())
        {
    }
    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live, IClock clock)
    {
        this.history = history;
        this.live = live;
        this.clock = clock;
    }

    public IObservable<Ticks> For(DateTime since, ISymbol symbol)
    {
        return Observable.Create<Ticks>(observer =>
        {
            var liveStream = Buffer<Ticks>.StartBuffering(live.For(since, symbol));

            var definitelyInHistoricalTicks = clock.Now;
            // Sleep to make sure that historical data overlaps our live data
            // If we ever use a data provider with less fresh historical data, we may need to rethink this
            clock.Wait(TimeSpan.FromSeconds(1));

            var liveStreamAfterEndOfHistoricalTicks = liveStream
               .SkipWhile(ticks => ticks.LastTimestamp <= definitelyInHistoricalTicks)
               .Select(ticks => ticks.RemoveBefore(definitelyInHistoricalTicks + 1));

            var subscription = history.For(since, symbol)
               .Select(historicalTicks => historicalTicks.RemoveAtOrAfter(definitelyInHistoricalTicks + 1))
               .Concat(liveStreamAfterEndOfHistoricalTicks)
               .Subscribe(observer);

            return liveStream.And(subscription);
        });
    }
}
public static class CompositeDisposableExtensions
{
    public static CompositeDisposable And(this IDisposable disposable, Action action)
    {
        return And(disposable, Disposable.Create(action));
    }

    public static CompositeDisposable And(this IDisposable disposable, IDisposable other)
    {
        return new CompositeDisposable(disposable, other);
    }
}

これはこのRxコードを使用していますが、私はまだ完全には信頼していません。

using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Subjects;

namespace My.Rx
{
    /// <summary>
    /// Buffers values from an underlying observable when no observers are subscribed.
    /// 
    /// On Subscription, any buffered values will be replayed.
    /// 
    /// Only supports one observer for now.
    /// 
    /// Buffer is an ISubject for convenience of implementation but IObserver methods
    /// are hidden. It is not intended that Buffer should be used as an IObserver,
    /// except through StartBuffering() and it is dangerous to do so because none of 
    /// the IObserver methods check whether Buffer has been disposed.
    /// </summary>
    /// <typeparam name="TSource"></typeparam>
    public class Buffer<TSource> : ISubject<TSource>, IDisposable
    {
        private readonly object gate = new object();
        private readonly Queue<TSource> queue = new Queue<TSource>();

        private bool isDisposed;
        private Exception error;
        private bool stopped;
        private IObserver<TSource> observer = null;
        private IDisposable subscription;

        public static Buffer<TSource> StartBuffering(IObservable<TSource> observable)
        {
            return new Buffer<TSource>(observable);
        }

        private Buffer(IObservable<TSource> observable)
        {
            subscription = observable.Subscribe(this);
        }

        void IObserver<TSource>.OnNext(TSource value)
        {
            lock (gate)
            {
                if (stopped) return;
                if (IsBuffering)
                    queue.Enqueue(value);
                else
                    observer.OnNext(value);
            }
        }

        void IObserver<TSource>.OnError(Exception error)
        {
            lock (gate)
            {
                if (stopped) return;
                if (IsBuffering)
                    this.error = error;
                else
                    observer.OnError(error);
                stopped = true;
            }
        }

        void IObserver<TSource>.OnCompleted()
        {
            lock (gate)
            {
                stopped = true;
            }
        }

        public IDisposable Subscribe(IObserver<TSource> observer)
        {
            lock (gate)
            {
                if (isDisposed)
                    throw new ObjectDisposedException(string.Empty);

                if (this.observer != null)
                    throw new NotImplementedException("A Buffer can currently only support one observer at a time");

                while(!queue.IsEmpty())
                {
                    observer.OnNext(queue.Dequeue());
                }

                if (error != null)
                    observer.OnError(error);
                else if (stopped)
                    observer.OnCompleted();

                this.observer = observer;
                return Disposable.Create(() =>
                                             {
                                                 lock (gate)
                                                 {
                                                                             // Go back to buffering
                                                     this.observer = null;
                                                 }
                                             });
            }
        }

        private bool IsBuffering
        {
            get { return observer == null; }
        }


        public void Dispose()
        {
            lock (gate)
            {
                subscription.Dispose();

                isDisposed = true;
                subscription = null;
                observer = null;
            }
        }
    }
}

これらのテストに合格するのはどれですか(私はまだスレッドセーフをチェックしていません):

private static readonly Exception exceptionThrownFromUnderlying = new Exception("Hello world");

[Test]
public void ReplaysBufferedValuesToFirstSubscriber()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    underlying.OnNext(1);
    underlying.OnNext(2);

    var observed = new List<int>();

    buffer.Subscribe(Observer.Create<int>(observed.Add));

    Assert.That(observed, Is.EquivalentTo(new []{1,2}));
}

[Test]
public void PassesNewValuesToObserver()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    var observed = new List<int>();
    buffer.Subscribe(Observer.Create<int>(observed.Add));

    underlying.OnNext(1);
    underlying.OnNext(2);

    Assert.That(observed, Is.EquivalentTo(new[] { 1, 2 }));
}


[Test]
public void DisposesOfSubscriptions()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    var observed = new List<int>();

    buffer.Subscribe(Observer.Create<int>(observed.Add))
        .Dispose();

    underlying.OnNext(1);

    Assert.That(observed, Is.Empty);
}

[Test]
public void StartsBufferingAgainWhenSubscriptionIsDisposed()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    // These should be buffered
    underlying.OnNext(1);
    underlying.OnNext(2);

    var firstSubscriptionObserved = new List<int>();
    using (buffer.Subscribe(Observer.Create<int>(firstSubscriptionObserved.Add)))
    {
        // Should be passed through to first subscription
        underlying.OnNext(3);
    }
    Assert.That(firstSubscriptionObserved, Is.EquivalentTo(new[] { 1, 2, 3 }));

    // First subscription has been disposed-
    // we should be back to buffering again
    underlying.OnNext(4);
    underlying.OnNext(5);

    var secondSubscriptionObserved = new List<int>();
    using (buffer.Subscribe(Observer.Create<int>(secondSubscriptionObserved.Add)))
    {
        // Should be passed through to second subscription
        underlying.OnNext(6);
    }
    Assert.That(secondSubscriptionObserved, Is.EquivalentTo(new[] { 4, 5 ,6}));
}

[Test]
public void DoesNotSupportTwoConcurrentObservers()
{
    // Use .Publish() if you need to do this

    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    buffer.Subscribe(Observer.Create<int>(i => { }));

    Assert.Throws<NotImplementedException>(() => buffer.Subscribe(Observer.Create<int>(i => { })));
}

[Test]
public void CannotBeUsedAfterDisposal()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    buffer.Dispose();

    Assert.Throws<ObjectDisposedException>(() => buffer.Subscribe(Observer.Create<int>(i => { })));
}

[Test]
public void ReplaysBufferedError()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    underlying.OnNext(1);
    underlying.OnError(exceptionThrownFromUnderlying);

    var observed = new List<int>();
    Exception foundException = null;
    buffer.Subscribe(
        observed.Add, 
        e => foundException = e);

    Assert.That(observed, Is.EquivalentTo(new []{1}));
    Assert.That(foundException, Is.EqualTo(exceptionThrownFromUnderlying));
}

[Test]
public void ReplaysBufferedCompletion()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    underlying.OnNext(1);
    underlying.OnCompleted();

    var observed = new List<int>();
    var completed = false;
    buffer.Subscribe(
        observed.Add,
        () => completed=true);

    Assert.That(observed, Is.EquivalentTo(new[] { 1 }));
    Assert.True(completed);
}

[Test]
public void ReplaysBufferedErrorToSubsequentObservers()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    underlying.OnNext(1);
    underlying.OnError(exceptionThrownFromUnderlying);

    // Drain value queue
    using (buffer.Subscribe(Observer.Create<int>(i => { }, e => { }))) ;

    var observered = new List<int>();
    Exception exceptionEncountered = null;
    using (buffer.Subscribe(Observer.Create<int>(observered.Add, e=>exceptionEncountered=e)));

    Assert.That(observered, Is.Empty);
    Assert.That(exceptionEncountered, Is.EqualTo(exceptionThrownFromUnderlying));
}

[Test]
public void ReplaysBufferedCompletionToSubsequentObservers()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    underlying.OnNext(1);
    underlying.OnCompleted();

    // Drain value queue
    using (buffer.Subscribe(Observer.Create<int>(i => { }))) ;

    var observered = new List<int>();
    var completed = false;
    using (buffer.Subscribe(Observer.Create<int>(observered.Add, ()=>completed=true)));

    Assert.That(observered, Is.Empty);
    Assert.True(completed);
}



[Test]
public void DisposingOfBufferDisposesUnderlyingSubscription()
{
    var underlyingSubscriptionWasDisposed = false;
    var underlying = Observable.Create<int>(observer => Disposable.Create(() => underlyingSubscriptionWasDisposed=  true   ));

    var buffer = Buffer<int>.StartBuffering(underlying);
    buffer.Dispose();

    Assert.True(underlyingSubscriptionWasDisposed);
}
于 2013-04-24T18:04:32.167 に答える
0

メモリとトレードオーバーラップ(正確さ)の観点から便利な方法。
あなたのフィードバックを待っています:

var tradeIds = new HashSet<string>();
var replayQuotationTrades = new ReplaySubject<IntradayTrade>();
var replaySubscription = _quotationTrades.Subscribe(replayQuotationTrades);
return _historyTrades
                .DelaySubscription(TimeSpan.FromMilliseconds(500), _backgroundScheduler)
                .Do(t => tradeIds.Add(t.TradeId))
                .Finally(() => DisposeAndCompleteReplayStream(replaySubscription, replayQuotationTrades))
                .Concat(replayQuotationTrades.Where(t => !tradeIds.Contains(t.TradeId)))
                .Finally(tradeIds.Clear)
                .Concat(_quotationTrades)
                .Subscribe(observer);
于 2016-01-10T23:05:31.143 に答える