リンクしたドキュメントにParallel
は、オプションの進行状況メーターがあると記載されています。callback
によって提供されるキーワード引数を使用して実装されmultiprocessing.Pool.apply_async
ます。
# This is inside a dispatch function
self._lock.acquire()
job = self._pool.apply_async(SafeFunction(func), args,
kwargs, callback=CallBack(self.n_dispatched, self))
self._jobs.append(job)
self.n_dispatched += 1
...
class CallBack(object):
""" Callback used by parallel: it is used for progress reporting, and
to add data to be processed
"""
def __init__(self, index, parallel):
self.parallel = parallel
self.index = index
def __call__(self, out):
self.parallel.print_progress(self.index)
if self.parallel._original_iterable:
self.parallel.dispatch_next()
そして、ここにありますprint_progress
:
def print_progress(self, index):
elapsed_time = time.time() - self._start_time
# This is heuristic code to print only 'verbose' times a messages
# The challenge is that we may not know the queue length
if self._original_iterable:
if _verbosity_filter(index, self.verbose):
return
self._print('Done %3i jobs | elapsed: %s',
(index + 1,
short_format_time(elapsed_time),
))
else:
# We are finished dispatching
queue_length = self.n_dispatched
# We always display the first loop
if not index == 0:
# Display depending on the number of remaining items
# A message as soon as we finish dispatching, cursor is 0
cursor = (queue_length - index + 1
- self._pre_dispatch_amount)
frequency = (queue_length // self.verbose) + 1
is_last_item = (index + 1 == queue_length)
if (is_last_item or cursor % frequency):
return
remaining_time = (elapsed_time / (index + 1) *
(self.n_dispatched - index - 1.))
self._print('Done %3i out of %3i | elapsed: %s remaining: %s',
(index + 1,
queue_length,
short_format_time(elapsed_time),
short_format_time(remaining_time),
))
正直なところ、彼らがこれを実装する方法はちょっと奇妙です-タスクは常に開始された順序で完了すると想定しているようです。index
に行く変数は、ジョブが実際に開始されたときの変数ですprint_progress
。したがって、起動された最初のジョブは、たとえ 3 番目のジョブが最初にself.n_dispatched
終了したとしても、常に0 で終了します。また、完了したジョブindex
の数を実際に追跡していないことも意味します。したがって、監視するインスタンス変数はありません。
最善の方法は、独自の CallBack クラスを作成し、モンキー パッチ Parallel を作成することだと思います。
from math import sqrt
from collections import defaultdict
from joblib import Parallel, delayed
class CallBack(object):
completed = defaultdict(int)
def __init__(self, index, parallel):
self.index = index
self.parallel = parallel
def __call__(self, index):
CallBack.completed[self.parallel] += 1
print("done with {}".format(CallBack.completed[self.parallel]))
if self.parallel._original_iterable:
self.parallel.dispatch_next()
import joblib.parallel
joblib.parallel.CallBack = CallBack
if __name__ == "__main__":
print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))
出力:
done with 1
done with 2
done with 3
done with 4
done with 5
done with 6
done with 7
done with 8
done with 9
done with 10
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
そうすれば、デフォルトのジョブではなく、ジョブが完了するたびにコールバックが呼び出されます。