2

次のコードでは、1 つの値を生成してから完了する単純なオブザーバブルを作成します。次に、最後のアイテムを再生し、3 回サブスクライブすることを共有します。1 回目は直後、2 回目は値が生成される前、3 回目は値が生成されてオブザーバブルが完了した後です。

let i = 0;
let obs$ = Rx.Observable.create(obs => {
  console.log('Creating observable');
  i++;
  setTimeout(() => {
     obs.onNext(i);
     obs.onCompleted();
  }, 2000);
}).shareReplay(1);

obs$.subscribe(
  data => console.log(`s1: data = ${data}`),
  () => {},
  () => console.log('finish s1')
);

setTimeout( () => {
  obs$.subscribe(
    data => console.log(`s2: data = ${data}`),
    () => {},
    () => console.log('finish s2')

  );  
}, 1000);

setTimeout( () => {
  obs$.subscribe(
    data => console.log(`s3: data = ${data}`),
    () => {},
    () => console.log('finish s3')

  );  
}, 6000);

これをjsbinで実行できます

これにより、次の大理石図が得られます

Actual
s1: -----1$
s2:   \--1$
s3:           \1$

しかし、私は期待します

Expected
s1: -----1$
s2:   \--1$
s3:           \----2$

誰かが最初の動作をしたい理由は理解できますが、私の理由は、この例とは異なり、数値を返す場合とは異なり、データベース接続など、サブスクライブ解除動作の影響を受けやすいオブジェクトを返す可能性があるからです。上記のマーブル ダイアグラムがデータベース接続を表している場合、dispose メソッドで を呼び出しますdb.close()。3 番目のサブスクリプションでは、解放されたデータベース ハンドラーを値として受け取っているため、例外が発生します。(2 番目のサブスクリプションが終了すると refCount = 0 になり、ソースが破棄されるため)。

また、この例のもう 1 つの奇妙な点は、最初の値で解決され、その直後に完了し、ソースを 2 回サブスクライブしていることです (複製された「Creating observable」でわかるように)。

このgithubの問題がこれについて話していることは知っていますが、欠けているのは次のとおりです。

ソースオブザーバブルが完了していない場合に最後のアイテムを再生できる共有オブザーバブルをどのように達成できますか (refCount = 0)、オブザーバブルを再作成します。

RxJs5 では、share メソッドが問題の再接続部分を解決すると思いますが、共有部分は解決しません。

RxJs4では私は無知です

可能であれば、既存の演算子またはサブジェクトを使用してこれを解決したいと考えています。私の直感では、そのようなロジックを使用して別のサブジェクトを作成する必要があることがわかりますが、まだそこまで進んでいません。

4

1 に答える 1

1

shareReplay について少し説明します。

shareReplayReplaySubject返されたオブザーバブルの存続期間中、同じ基本インスタンスを保持します。

完了ReplaySubjectすると、これ以上値を入れることはできませんが、再生は続けられます。そう...

  1. 初めてオブザーバブルをサブスクライブすると、タイムアウトが開始されます。これiは から0まで増加し1ます。
  2. 2回目にオブザーバブルをサブスクライブすると、タイムアウトがすでに発生しています。
  3. タイムアウト コールバックが発生し、 を送信しonNext(i)、次にonCompleted().
  4. onCompleted()signal は のReplaySubject内部を完了しますshareReplay。つまり、今後は、その共有オブザーバブルは、それが持つ値 (1) を単純に再生して完了することを意味します。

一般的な共有オブザーバブルについて少し説明します。

もう1つの別の問題は、オブザーバブルを共有したため、サブスクライバー関数を1回だけ呼び出すことです。つまり、i一度だけインクリメントされます。したがってonCompleted、基礎となるReplaySubjectを強制終了しなかったとしても、それを にインクリメントしないことになり2ます。

これは RxJS 5 ではありません

見分ける簡単な方法はonNextvsnextです。現在、例では RxJS 4 を使用していますが、これに RxJS 5 のタグを付けており、RxJS 5 で問題を発見しました。RxJS 5 はベータ版であり、RxJS 4 を完全に書き直した新しいバージョンです。 API の変更は主に、現在ステージ 1 にある es-observable 提案に一致するように行われました。

更新された例

期待される結果が得られるように例を更新しました

基本的に、最初の 2 つの呼び出しにはオブザーバブルの共有バージョンを使用し、3 番目の呼び出しには元のオブザーバブルを使用します。

let i = 0;
let obs$ = Rx.Observable.create(obs => {
  console.log('Creating observable');
  i++;
  setTimeout(() => {
     obs.onNext(i);
     obs.onCompleted();
  }, 2000);
})


let shared$ = obs$.shareReplay(1);

shared$.subscribe(
  data => console.log(`s1: data = ${data}`),
  () => {},
  () => console.log('finish s1')
);

setTimeout( () => {
  shared$.subscribe(
    data => console.log(`s2: data = ${data}`),
    () => {},
    () => console.log('finish s2')

  );  
}, 1000);

setTimeout( () => {
  obs$.subscribe(
    data => console.log(`s3: data = ${data}`),
    () => {},
    () => console.log('finish s3')

  );  
}, 6000);

無関係

また、ヒント: を呼び出すカスタム オブザーバブルのキャンセル セマンティックを返すようにしてくださいclearTimeout

于 2016-04-22T16:59:29.603 に答える