14

Angular 2 プロジェクトでRxJs バージョン 5を使用しています。いくつかのオブザーバブルを作成したいのですが、オブザーバブルがすぐに呼び出されるのは望ましくありません。

バージョン 4では、(たとえば) ControlledコマンドまたはPausable Buffersを使用して呼び出しを制御できました。しかし、その機能はバージョン 5では (まだ) 利用できません。

RxJs 5 でこの種の機能を取得するにはどうすればよいですか?

私の最終的な目標は、作成されたオブザーバブルをキューに入れ、1 つずつ呼び出すことです。次のものは、前のものが正常に処理された場合にのみ呼び出されます。いずれかが失敗すると、キューは空になります。

編集

@Niklas Fasching のコメントにより、Publish操作で実用的なソリューションを作成できました。

JSビン

// Queue to queue operations
const queue = [];

// Just a function to create Observers
function createObserver(id): Observer {
    return {
        next: function (x) {
            console.log('Next: ' + id + x);
        },
        error: function (err) {
            console.log('Error: ' + err);
        },
        complete: function () {
            console.log('Completed');
        }
    };
};

// Creates an async operation and add it to the queue
function createOperation(name: string): Observable {

  console.log('add ' + name);
  // Create an async operation
  var observable = Rx.Observable.create(observer => {
    // Some async operation
    setTimeout(() => 
               observer.next(' Done'), 
               500);
  });
  // Hold the operation
  var published = observable.publish();
  // Add Global subscribe
  published.subscribe(createObserver('Global'));
  // Add it to the queue
  queue.push(published);
  // Return the published so the caller could add a subscribe
  return published;
};

// Create 4 operations on hold
createOperation('SourceA').subscribe(createObserver('SourceA'));
createOperation('SourceB').subscribe(createObserver('SourceB'));
createOperation('SourceC').subscribe(createObserver('SourceC'));
createOperation('SourceD').subscribe(createObserver('SourceD'));

// Dequeue and run the first
queue.shift().connect();
4

2 に答える 2

20

Rx4 のcontrolledObservable では、サブスクライブ時に引き続き呼び出されます

controlledRxJS 4のオペレーターは、オペレーターのの Observable のフローを制御するだけでした。その時点まで、すべてがそのオペレーターでポンピングされ、バッファリングされます。このことを考慮:

(RxJS 4) http://jsbin.com/yaqabe/1/edit?html,js,console

const source = Rx.Observable.range(0, 5).do(x => console.log('do' + x)).controlled();

source.subscribe(x => console.log(x));

setTimeout(() => {
  console.log('requesting');
  source.request(2);
}, 1000);

からの 5 つの値すべてが、すぐにObservable.range(0, 5)によって出力されることに気付くでしょう。その後、2 つの値を取得する前に 1 秒間 (1000 ミリ秒) 一時停止します。do

つまり、これは実際には背圧制御の錯覚です。結局、その演算子には無制限のバッファーがあります。「上にある」Observable が送信するすべてのものを収集し、 を呼び出してデキューするのを待っている配列request(n)


RxJS 5.0.0-beta.2 の複製controlled

この回答の時点で、controlledオペレーターは RxJS 5 に存在しません。これにはいくつかの理由があります。

RxJS 5 で動作を複製する方法 (今のところ): http://jsbin.com/metuyab/1/edit?html,js,console

// A subject we'll use to zip with the source
const controller = new Rx.Subject();

// A request function to next values into the subject
function request(count) {
  for (let i = 0; i < count; i++) {
    controller.next(count);
  }
}

// We'll zip our source with the subject, we don't care about what
// comes out of the Subject, so we'll drop that.
const source = Rx.Observable.range(0, 5).zip(controller, (x, _) => x);

// Same effect as above Rx 4 example
source.subscribe(x => console.log(x));

// Same effect as above Rx 4 example
request(3);

背圧制御

現在の「実際の背圧制御」の場合、1 つの解決策は Promise の反復子です。ただし、IoP には問題がないわけではありません。1 つは、各ターンでオブジェクトの割り当てがあることです。すべての値には Promise が関連付けられています。もう一つは、約束なのでキャンセルはありません。

より良い Rx ベースのアプローチは、オブザーバブル チェーンの先頭に「フィード」するサブジェクトを作成し、残りの部分を構成することです。

このようなもの: http://jsbin.com/qeqaxo/2/edit?js,console

// start with 5 values
const controller = new Rx.BehaviorSubject(5);

// some observable source, in this case, an interval.
const source = Rx.Observable.interval(100)

const controlled = controller.flatMap(
      // map your count into a set of values
      (count) => source.take(count), 
      // additional mapping for metadata about when the block is done
      (count, value, _, index) => {
        return { value: value, done: count - index === 1 }; 
      })
      // when the block is done, request 5 more.
      .do(({done}) => done && controller.next(5))
      // we only care about the value for output
      .map(({value}) => value);


// start our subscription
controlled.subscribe(x => {
  console.log(x)
});

...近い将来、実際の背圧制御を備えたフロー可能なオブザーバブル型の計画もいくつかあります。この種のシナリオでは、それはよりエキサイティングではるかに優れています.

于 2016-02-11T18:23:04.180 に答える
6

オブザーバブルを公開することで、サブスクリプションからオブザーバブルの開始を分離できます。公開されたオブザーバブルは、接続を呼び出した後にのみ開始されます。

すべてのサブスクライバーは、オブザーバブル シーケンスへの単一のサブスクリプションを共有することに注意してください。

var published = Observable.of(42).publish();
// subscription does not start the observable sequence
published.subscribe(value => console.log('received: ', value));
// connect starts the sequence; subscribers will now receive values
published.connect();
于 2016-02-13T12:09:31.503 に答える