0

技術的背景

Web ブラウザーで jQuery を使用して、ログから一連のエントリを返す API を呼び出しています。

API リクエストは次の 2 つのパラメータを取ります。

  • offset_timestamp: 必要な最も早いエントリを指定する整数
  • limit: 返すレコード数を指定する整数

リクエストとレスポンスの例

Request with parameters:

- offset_timestamp = 100
- limit = 50


curl "https://example.com/log?offset_timestamp=100&limit=5"


Resulting JSON Response:

{
    next_timestamp: 222,
    end_of_log: false,
    entries: [
        {
            timestamp: 111,
            data: { ... }
        },
        {
            timestamp: 112,
            data: { ... }
        },

        ...

        {
            timestamp: 160,
            data: { ... }
        }
    ]
}

単純な jQuery + コールバックを使用していた場合、AJAX 呼び出しを再帰的に連鎖させる必要があると思います。次のようなもの:

// NOTE: I have NOT tested this code.
//       I just wrote it for illustration purposes

function getBatch(offset, limit, callback) {
    var url = "https://example.com/logs?offset_timestamp=" + offset + "&limit=" + limit;
    var ajaxParams = {
        method: "GET",
        url: url
    };

    jQuery.ajax(ajaxParams))
        .done(function(data) {
            if (data.end_of_log) {
                return callback(data.entries);
            }
            return getBatch(data.next_timestamp, limit, function(entries) { 
                return callback(entries.concat(data.entires));
            });
        });
}

function processEntries(entries) {
    for (var i = 0; i < entries.length; i++) {
        console.log(i.data);
    }
}

getBatch(0, 50, processEntries);

flatMap()当然のことながら、すべてのエントリのシーケンスを取得するために使用できるように、Observable のシーケンス (それぞれがエントリのバッチを 1 つ保持) を使用したいと考えています。

質問

jQuery 呼び出しから Observable を作成する場合、たとえばRx.Observable.fromPromise(jQuery.ajax({...}))、RxJS を使用してresponse.next_timestamp、後続の呼び出しのパラメーターで前の呼び出しの値を使用して、これらの Observable を任意の数だけチェーンすることは可能ですか?

4

2 に答える 2

1

Rx.Subjectストリームへの書き込み (データのプッシュ) とストリームからの読み取り (サブスクライブ) を可能にします。Rx.Subjectのように動作しRx.Observableますが、インスタンスにはonNextデータをプッシュできるメソッドがあります。

リクエストを としてモデル化しRx.Subject、レスポンスのストリームをそれに依存させることができます。次に、レスポンスをサブスクライブして、次のリクエストをリクエスト ストリームにプッシュします。

// Represents our stream of requests
const request$ = new Rx.Subject();

// Represents our AJAX responses
const response$ = request$
    .startWith(initialRequest)
    .flatMap(makeAjaxCall)

    // Convert to a hot stream so we don't end up making an AJAX
    // request for every subscriber of this stream.
    .share();

// And we can map over response$ to get the updated timestamp
const timestamp$ = response$.map(getTimeStamp);

// And we can subscribe to the timestamp$ stream to push new requests
timestamp$.subscribe(timestamp => {
    const request = makeRequestFromTimestamp(timestamp);

    // Call onNext to push a new item onto our request$ stream
    request$.onNext(request);
});

// And we can subscribe to our responses. Note that our response$
// stream will continue indefinitely.
response$.subscribe(x => console.log(x));
于 2016-03-16T18:48:12.177 に答える
1

expand演算子を使用しようと思います。ここで使用例を確認することもできます: RxJs: How to loop based on state of the observable? 、およびここ: RxJS、API をポーリングして、動的タイムスタンプを使用して更新されたレコードを継続的にチェックする方法

つまり、オブザーバブルを使用して再帰を実行し、 を返すことで再帰の終わりを知らせることができますRx.Observable.empty()

于 2016-03-12T09:56:16.393 に答える