0

処理ファイルを使用するために RxPy を使用し、パイプのロードのシーケンスを構築したい

pool_scheduler = ThreadPoolScheduler(multiprocessing.cpu_count())

    rx.from_list(independing_files).pipe(
        self._build_dataflow(),
        ops.subscribe_on(pool_scheduler),
    ).subscribe(
        on_next=lambda file: logger.info(f'file: {file}'),
        on_error=print,
        on_completed=lambda: logger.info("independing frames loaded!"))

    withdraw_file = []
    for file in filtered_files:
        if self._table_name_on_contain(file) == 'mellow':
            withdraw_file += [file]

    rx.from_list(withdraw_file).pipe(
        self._build_apples_dataflow(),
        ops.subscribe_on(pool_scheduler)
    ).subscribe(
        on_next=lambda file: logger.info(f'file: {file}'),
        on_error=print,
        on_completed=lambda: logger.info("apples loaded!"))

    rx.from_list(depending_files).pipe(
        self._build_dataflow(),
        ops.subscribe_on(pool_scheduler)
    ).subscribe(
        on_next=lambda file: logger.info(f'file: {file}'),
        on_error=print,
        on_completed=lambda: self._complete_action())

しかし、予期しない結果が得られました。「停止ポイント」を指定していないため、各パイプが非同期で実行されているようです。2 番目と 3 番目のパイプは、最初のパイプが完了した後でのみ開始するようにします。修正方法は?

4

2 に答える 2