RxJSrepeatWhen
オペレーターを使用してネットワーク再試行を作成しようとしています。新しいリクエストがスケジューラーに届いたら、そのリクエストを直接試し、結果としてネットワーク障害が発生した場合は、それをプールに追加して後で再試行するという考え方です。したがって、私のスケジューラの開始点は、次のようなジョブを実行するキュー関数です。
queue({ operation, body }) {
const behaviourSubject = new BehaviorSubject();
const task = {
operation,
body,
behaviourSubject,
};
this.doTask(task).subscribe({
error: _ => this.notifier.next({ tasks: [task], remove: false }),
complete: () => console.log('Task successfully done on first try: ', task),
});
return behaviourSubject;
}
ワーカーの通知者として使用されるですthis.notifier
。Subject
したがって、ワーカー自体は次のようになります。
const rawWorker = new Observable(subscriber => {
const doneTasks = [];
const jobs = [];
for (const task of this.tasks) {
jobs.push(
this.doTask(task).pipe(
tap(_ => {
doneTasks.push(task);
}),
catchError(error => { task.behaviourSubject.next(error); return of(error); }),
)
);
}
if (jobs.length > 0) {
forkJoin(...jobs).subscribe(_ => {
this.notifier.next({ tasks: doneTasks, remove: true });
subscriber.next(`One cycle of worker done. #${doneTasks.length} task(s) done and #${this.tasks.length} remaining.`);
// subscriber.complete();
if (this.tasks.length > 0) {
this.notifier.next();
}
})
} else {
subscriber.complete();
}
});
const scheduledWorker = rawWorker.pipe( // TODO: delay should be added to retry and repeat routines
retry(),
repeatWhen(_ => this.notifier.pipe(
filter(_ => this.tasks.length > 0),
)),
);
また、ノーティファイアーは、次のような配列ですべての元に戻されたリクエストを追跡します。
this.notifierSubscription = this.notifier
.pipe(
filter(data => data && data.tasks)
)
.subscribe({
next: ({ tasks = [], remove = false }) => {
if (remove) {
console.log('removing tasks: ', tasks);
this.tasks = this.tasks.filter(task => !tasks.some(tsk => task === tsk));
} else {
console.log('inserting: ', tasks);
this.tasks.push.apply(
this.tasks,
tasks,
);
}
console.log('new tasks array: ', this.tasks);
}
});
私が知っているように、ワーカーのサイクルが完了していない場合、repeatWhen は何の関係もありません。たとえば、パーツを削除すると:
else {
subscriber.complete();
}
ワーカーから、ワーカーの最初の試行 (空のサイクル) で、Observable は完了せず、repeatWhen は何もしません。しかし一方で、ご覧// subscriber.complete();
のとおり、ジョブが存在する場合にコメントしましたが、繰り返しが発生しています。そして、問題の最悪の部分は、ワーカーの多くの異なるインスタンスが並行して実行され、多くの重複したリクエストが作成されることです。
私はこの問題に多くの時間を費やしましたが、追跡する手がかりがありません。