処理ファイルを使用するために RxPy を使用し、パイプのロードのシーケンスを構築したい
pool_scheduler = ThreadPoolScheduler(multiprocessing.cpu_count())
rx.from_list(independing_files).pipe(
self._build_dataflow(),
ops.subscribe_on(pool_scheduler),
).subscribe(
on_next=lambda file: logger.info(f'file: {file}'),
on_error=print,
on_completed=lambda: logger.info("independing frames loaded!"))
withdraw_file = []
for file in filtered_files:
if self._table_name_on_contain(file) == 'mellow':
withdraw_file += [file]
rx.from_list(withdraw_file).pipe(
self._build_apples_dataflow(),
ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda file: logger.info(f'file: {file}'),
on_error=print,
on_completed=lambda: logger.info("apples loaded!"))
rx.from_list(depending_files).pipe(
self._build_dataflow(),
ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda file: logger.info(f'file: {file}'),
on_error=print,
on_completed=lambda: self._complete_action())
しかし、予期しない結果が得られました。「停止ポイント」を指定していないため、各パイプが非同期で実行されているようです。2 番目と 3 番目のパイプは、最初のパイプが完了した後でのみ開始するようにします。修正方法は?