26

RxJs を使用してブラウザーで次のシナリオを実行するにはどうすればよいですか。

  • 処理のためにデータをキューに送信する
  • ジョブ ID を取得する
  • 結果が得られるまで、または 60 秒が経過するまで、1 秒ごとに別のエンドポイントをポーリングします (その後、失敗します)。

私が思いついた中間ソリューション:

 Rx.Observable
    .fromPromise(submitJobToQueue(jobData))
    .flatMap(jobQueueData => 
      Rx.Observable
            .interval(1000)
            .delay(5000)
            .map(_ => jobQueueData.jobId)
            .take(55)
    )
    .flatMap(jobId => Rx.Observable.fromPromise(pollQueueForResult(jobId)))
    .filter(result => result.completed)
    .subscribe(
      result => console.log('Result', result),
      error =>  console.log('Error', error)
    );
  1. データが到着するかエラーが発生したときにタイマーを停止する中間変数なしの方法はありますか? 私は今、新しいオブザーバブルを導入してから使用することができましたtakeUntil
  2. ここでの使用はflatMap意味的に正しいですか? たぶん、この全体を書き直して、チェーン化しないでflatMapください。
4

5 に答える 5

43

上から始めて、オブザーバブルに変わるという約束があります。これで値が得られたら、特定の応答 (成功) を受け取るまで、または特定の時間が経過するまで、1 秒に 1 回呼び出しを行う必要があります。この説明の各部分を Rx メソッドにマッピングできます。

「これが値を生成したら」 = map/ flatMap(flatMapこの場合、次に来るものもオブザーバブルになるため、それらを平坦化する必要があるため)

"1 秒に 1 回" =interval

「特定の応答を受け取る」=filter

「または」=amb

「一定の時間が経過した」=timer

そこから、次のようにつなぎ合わせることができます。

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .filter(x => x.completed)
      .take(1)
      .map(() => 'Completed')
      .amb(
        Rx.Observable.timer(60000)
          .flatMap(() => Rx.Observable.throw(new Error('Timeout')))
      )
  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  )
;

最初の結果を取得したら、それを 2 つのオブザーバブル間の競合に投影します。1 つは成功した応答を受け取ったときに値を生成し、もう 1 つは一定の時間が経過したときに値を生成します。2 つ目は、監視可能なインスタンスに が存在しないflatMapためです。メソッド onは監視可能なものを返しますが、これも平坦化する必要があります。.throwRx.Observable

amb/timerコンボは、実際には次のように に置き換えることができることがわかりましたtimeout

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .filter(x => x.completed)
      .take(1)
      .map(() => 'Completed')
      .timeout(60000, Rx.Observable.throw(new Error('Timeout')))
  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  )
;

希望のロジックで説明されていなかったため、サンプルに含まれていたを省略しました.delayが、このソリューションに簡単に適合させることができます。

したがって、質問に直接答えるには:

  1. interval上記のコードでは、サブスクライバー数がゼロになった瞬間に が破棄されるため、手動で何かを停止する必要はありません。これは、take(1)またはamb/timeoutが完了したときに発生します。
  2. はい、どちらの場合も、オブザーバブルの各要素を新しいオブザーバブルに投影し、オブザーバブルの結果のオブザーバブルを通常のオブザーバブルにフラット化したいため、元の両方の使用法は有効でした。

ソリューションをテストするために一緒に投げた jsbinを次に示します (返された値を微調整しpollQueueForResultて、目的の成功/タイムアウトを取得できます。迅速なテストのために、時間を 10 で割っています)。

于 2016-03-15T13:32:54.673 に答える
15

@matt-burnell からの優れた回答に対する小さな最適化。次のように、フィルターを置き換え、演算子を最初の演算子に置き換えることができます。

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .first(x => x.completed)
      .map(() => 'Completed')
      .timeout(60000, Rx.Observable.throw(new Error('Timeout')))

  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  );

また、ご存じないかもしれませんが、 flatMapオペレーターは RxJS 5.0 の mergeMap のエイリアスです

于 2016-11-30T09:24:45.260 に答える
2

同じユースケースもあり、以下のコードはかなりうまく機能します。

import { timer, Observable } from "rxjs";
import { scan, tap, switchMapTo, first } from "rxjs/operators";

function checkAttempts(maxAttempts: number) {
  return (attempts: number) => {
    if (attempts > maxAttempts) {
      throw new Error("Error: max attempts");
    }
  };
}

export function pollUntil<T>(
  pollInterval: number,
  maxAttempts: number,
  responsePredicate: (res: any) => boolean
) {
  return (source$: Observable<T>) =>
    timer(0, pollInterval).pipe(
      scan(attempts => ++attempts, 0),
      tap(checkAttempts(maxAttempts)),
      switchMapTo(source$),
      first(responsePredicate)
    );
}

試行回数が制限に達した場合、エラーがスローされ、出力ストリームがサブスクライブ解除されます。さらに、responsePredicate として定義された特定の条件が満たされないまで、http リクエストのみを行います。

使用例:

import { of } from "rxjs";

import { pollUntil } from "./poll-until-rxjs";

const responseObj = { body: { inProgress: true } };
const response$ = of(responseObj);
// this is to simulate a http call
response$
  .pipe(pollUntil(1000, 3, ({ body }) => !body.inProgress))
  .subscribe(({ body }) => console.log("Response body: ", body));

setTimeout(() => (responseObj.body.inProgress = false), 1500);
于 2020-05-05T21:54:44.460 に答える
0

上記のAngular / typescriptの書き直されたソリューション:

export interface PollOptions {
  interval: number;
  timeout: number;
}

const OPTIONS_DEFAULT: PollOptions = {
  interval: 5000,
  timeout: 60000
};
@Injectable()
class PollHelper {
  startPoll<T>(
    pollFn: () => Observable<T>, // intermediate polled responses
    stopPollPredicate: (value: T) => boolean, // condition to stop polling
    options: PollOptions = OPTIONS_DEFAULT): Observable<T> {
    return interval(options.interval)
      .pipe(
        exhaustMap(() => pollFn()),
        first(value => stopPollPredicate(value)),
        timeout(options.timeout)
      );
  }
}

例:

pollHelper.startPoll<Response>(
  () => httpClient.get<Response>(...),
  response => response.isDone()
).subscribe(result => {
  console.log(result);
});
于 2020-03-22T14:13:45.263 に答える