50

joblib.Parallel実行の全体的な進行状況を追跡する簡単な方法はありますか?

何千ものジョブで構成された長期実行があり、データベースに追跡して記録したいと考えています。ただし、これを行うには、Parallel がタスクを終了するたびにコールバックを実行して、残っているジョブの数を報告する必要があります。

私は以前、Python の stdlib multiprocessing.Pool で同様のタスクを達成しました。これは、Pool のジョブ リストに保留中のジョブの数を記録するスレッドを起動することで実現しました。

コードを見ると、Parallel は Pool を継承しているので、同じトリックを実行できると思いましたが、これらのリストを使用していないようで、内部を「読み取る」方法が他にわかりませんでした。他の方法でステータス。

4

8 に答える 8

21

リンクしたドキュメントにParallelは、オプションの進行状況メーターがあると記載されています。callbackによって提供されるキーワード引数を使用して実装されmultiprocessing.Pool.apply_asyncます。

# This is inside a dispatch function
self._lock.acquire()
job = self._pool.apply_async(SafeFunction(func), args,
            kwargs, callback=CallBack(self.n_dispatched, self))
self._jobs.append(job)
self.n_dispatched += 1

...

class CallBack(object):
    """ Callback used by parallel: it is used for progress reporting, and
        to add data to be processed
    """
    def __init__(self, index, parallel):
        self.parallel = parallel
        self.index = index

    def __call__(self, out):
        self.parallel.print_progress(self.index)
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()

そして、ここにありますprint_progress

def print_progress(self, index):
    elapsed_time = time.time() - self._start_time

    # This is heuristic code to print only 'verbose' times a messages
    # The challenge is that we may not know the queue length
    if self._original_iterable:
        if _verbosity_filter(index, self.verbose):
            return
        self._print('Done %3i jobs       | elapsed: %s',
                    (index + 1,
                     short_format_time(elapsed_time),
                    ))
    else:
        # We are finished dispatching
        queue_length = self.n_dispatched
        # We always display the first loop
        if not index == 0:
            # Display depending on the number of remaining items
            # A message as soon as we finish dispatching, cursor is 0
            cursor = (queue_length - index + 1
                      - self._pre_dispatch_amount)
            frequency = (queue_length // self.verbose) + 1
            is_last_item = (index + 1 == queue_length)
            if (is_last_item or cursor % frequency):
                return
        remaining_time = (elapsed_time / (index + 1) *
                    (self.n_dispatched - index - 1.))
        self._print('Done %3i out of %3i | elapsed: %s remaining: %s',
                    (index + 1,
                     queue_length,
                     short_format_time(elapsed_time),
                     short_format_time(remaining_time),
                    ))

正直なところ、彼らがこれを実装する方法はちょっと奇妙です-タスクは常に開始された順序で完了すると想定しているようです。indexに行く変数は、ジョブが実際に開始されたときの変数ですprint_progress。したがって、起動された最初のジョブは、たとえ 3 番目のジョブが最初にself.n_dispatched終了したとしても、常に0 で終了します。また、完了したジョブindexの数を実際に追跡していないことも意味します。したがって、監視するインスタンス変数はありません。

最善の方法は、独自の CallBack クラスを作成し、モンキー パッチ Parallel を作成することだと思います。

from math import sqrt
from collections import defaultdict
from joblib import Parallel, delayed

class CallBack(object):
    completed = defaultdict(int)

    def __init__(self, index, parallel):
        self.index = index
        self.parallel = parallel

    def __call__(self, index):
        CallBack.completed[self.parallel] += 1
        print("done with {}".format(CallBack.completed[self.parallel]))
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.CallBack = CallBack

if __name__ == "__main__":
    print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))

出力:

done with 1
done with 2
done with 3
done with 4
done with 5
done with 6
done with 7
done with 8
done with 9
done with 10
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

そうすれば、デフォルトのジョブではなく、ジョブが完了するたびにコールバックが呼び出されます。

于 2014-07-27T18:24:14.060 に答える