上から始めて、オブザーバブルに変わるという約束があります。これで値が得られたら、特定の応答 (成功) を受け取るまで、または特定の時間が経過するまで、1 秒に 1 回呼び出しを行う必要があります。この説明の各部分を Rx メソッドにマッピングできます。
「これが値を生成したら」 = map
/ flatMap
(flatMap
この場合、次に来るものもオブザーバブルになるため、それらを平坦化する必要があるため)
"1 秒に 1 回" =interval
「特定の応答を受け取る」=filter
「または」=amb
「一定の時間が経過した」=timer
そこから、次のようにつなぎ合わせることができます。
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.filter(x => x.completed)
.take(1)
.map(() => 'Completed')
.amb(
Rx.Observable.timer(60000)
.flatMap(() => Rx.Observable.throw(new Error('Timeout')))
)
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
)
;
最初の結果を取得したら、それを 2 つのオブザーバブル間の競合に投影します。1 つは成功した応答を受け取ったときに値を生成し、もう 1 つは一定の時間が経過したときに値を生成します。2 つ目は、監視可能なインスタンスに が存在しないflatMap
ためです。メソッド onは監視可能なものを返しますが、これも平坦化する必要があります。.throw
Rx.Observable
amb
/timer
コンボは、実際には次のように に置き換えることができることがわかりましたtimeout
。
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.filter(x => x.completed)
.take(1)
.map(() => 'Completed')
.timeout(60000, Rx.Observable.throw(new Error('Timeout')))
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
)
;
希望のロジックで説明されていなかったため、サンプルに含まれていたを省略しました.delay
が、このソリューションに簡単に適合させることができます。
したがって、質問に直接答えるには:
interval
上記のコードでは、サブスクライバー数がゼロになった瞬間に が破棄されるため、手動で何かを停止する必要はありません。これは、take(1)
またはamb
/timeout
が完了したときに発生します。
- はい、どちらの場合も、オブザーバブルの各要素を新しいオブザーバブルに投影し、オブザーバブルの結果のオブザーバブルを通常のオブザーバブルにフラット化したいため、元の両方の使用法は有効でした。
ソリューションをテストするために一緒に投げた jsbinを次に示します (返された値を微調整しpollQueueForResult
て、目的の成功/タイムアウトを取得できます。迅速なテストのために、時間を 10 で割っています)。