データ抽出ジョブの場合、データがストアから抽出される ProcessPoolExecutor を使用します。抽出がメモリ内のレコード数のしきい値に達すると、抽出されたデータセットは ProcessPoolExecutor を介して別のプロセスに渡され、メイン プロセス/イテレーターはレコードのフェッチを続けます。ProcessPoolExecutor によって生成されたプロセスは、受信したデータセット内のレコードを変換、フィルター処理します。
特に大規模なデータセットを操作する場合、次の BrokenProcessPool を頻繁に取得し続けます。それをヒント(より大きなデータセット)として、関連するパラメーターを調整すると、このエラーは少し遅れますが、最終的にエラーがスローされます。情報が不足しているため、それは非決定論的なパズルのままであり、いつ/どのポイントで再び発生するかを特定できません。
私は他の質問を調べましたが、それを情報として使用して進歩することはできません.
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib64/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib64/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib64/python3.6/concurrent/futures/process.py", line 295, in _queue_management_worker
shutdown_worker()
File "/usr/lib64/python3.6/concurrent/futures/process.py", line 253, in shutdown_worker
call_queue.put_nowait(None)
File "/usr/lib64/python3.6/multiprocessing/queues.py", line 129, in put_nowait
return self.put(obj, False)
File "/usr/lib64/python3.6/multiprocessing/queues.py", line 83, in put
raise Full
queue.Full
具体的には、queue.Full エラーが発生する queues.py のコード スニペットを次に示します。
def put(self, obj, block=True, timeout=None):
assert not self._closed, "Queue {0!r} is closed".format(self)
if not self._sem.acquire(block, timeout):
raise Full
このエラーがどのような状況で発生するかは不明ですが、
- セマフォの数が不足しているためですか?
- キューがいっぱいになるのは、高速なプロデューサーと低速なコンシューマーのシナリオによるものですか? その場合、キューに書き込むプロセスをブロックすることはできますか?
- キューの空き容量を知る方法はありますか? 要素をキューに正常に配置する前に、ライターを調整できるようにするには?
決定論的にデバッグする方法についての考えや指針はありますか?