3

以下を例にとります。

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();

ここで達成しようとしているのは、任意の時点でシーケンス内の最新のアイテムの値を「同期的に」取得することです。つまり、次のような拡張機能ではFirstAsync対応できません。

StartWithandReplayビットは、常に値が存在することを保証します。RefCount実際のコードでは、処分アクションをいつ実行できるかを検出するためにビットが必要です。

この「任意の時間」部分をシミュレートするために、5 秒後に最新の値を取得してみましょう。

Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    // Try to get latest value from "ob" here.
});

したがって、5秒の遅延5で、シーケンスから値を取得する必要があります。これらは、これまでに試したものの成功していません。

  1. ob.First()- 500 を返します
  2. ob.Latest().Take(1)- 同上
  3. ob.MostRecent(-1).First()- 同上
  4. ob.MostRecent(-1)- IEnumerable<long>「500」でいっぱいです
  5. ob.Last()- シーケンスが完了するのを待っているため、戻りません。
  6. ob.Latest().Last()- 同上
  7. ob.ToTask().Result- 同上
  8. ob.ToEnumerable()- 同上
  9. ob.MostRecent().Last()同上

人々が実際にこれを行うことができるリソースはあまりないようです。私が見つけることができる最も近いものはこれです: " Rx: Observable ストリームから最初と最新の値を取得するための演算子" ですが、結局同期呼び出しではない (まだサブスクリプションを使用している) ため、うまくいきません。

これが実際に実行可能かどうかを知っている人はいますか?

4

4 に答える 4

3

コードが期待どおりに機能しない可能性がある理由を指摘する

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
//Note at this point `ob` has never been subscribed to,
// so the Reference-count is 0 i.e. has not be connected.

Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    // Try to get latest value from "ob" here.

    //Here we make our first subscription to the `ob` sequence.
    //  This will connect the sequence (invoke subscribe)
    //   which will
    //      1) invoke StartWith
    //      2) invoke onNext(500)
    //      3) invoke First()
    //      4) First() will then unsubscribe() as it has the single value it needs
    //      5) The refCount will now return to 0
    //      6) The sequence will be unsubscribed to.
    ob.First().Dump();  

    //Any future calls like `ob.First()` will thus always get the value 500.
});

潜在的にあなたが欲しいのは

var ob = Observable.Interval(TimeSpan.FromSeconds(1))
    .Publish(500);
var connection = ob.Connect();
//Note at this point `ob` has never been subscribed to, so the ReferenceCount is 0 i.e. has not be connected.

var subscription = Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    // Try to get latest value from "ob" here.
    ob.First().Dump();
});

//Sometime later
subscription.Dispose();
connection.Dispose()

ただし、同期呼び出しと Rx を混在させたくはありません。また、通常、サブスクリプション内でサブスクライブすることは望ましくありません (サブスクリプションと同様.First())。おそらく、最新の値を取得して、どこかに保管することを意味しています。使い方.First()はただの滑り台です。あなたはおそらく次のようなものを書く方が良いでしょう

var subscription = Observable.Timer(TimeSpan.FromSeconds(5))
    .SelectMany(_=>ob.Take(1))
    .Subscribe(x =>
    {
        //Do something with X here.
        x.Dump();
    }); 
于 2016-10-17T07:41:51.127 に答える
0

これを少し明確にするために、@ LeeCampbellの回答に感謝します。

機能していなかったもの:

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    ob.First().Dump();
    // This gives you 500.
    // Because this is the first time any one subscribes to the observable,
    // so it starts right here and gives you the initial value.
});

実際に機能するもの:

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
ob.Subscribe(); // Subscribe to start the above hot observable immediately.
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    ob.First().Dump(); 
    // This would give you either 3 or 4, depending on the speed and timing of your computer.
});
于 2016-10-17T23:26:54.760 に答える
-1

この回答が役立つかどうかはわかりませんが、BehaviorSubject を調べましたか? 最新の値を記憶する IObservable です。これは、通常の変数とオブザーバブルを 1 つに組み合わせたようなものです。

それ以外の場合は、「ob」にサブスクライブして、最新の値を自分で変数に格納してみませんか?

于 2016-10-17T06:41:16.807 に答える