0

データ抽出ジョブの場合、データがストアから抽出される ProcessPoolExecutor を使用します。抽出がメモリ内のレコード数のしきい値に達すると、抽出されたデータセットは ProcessPoolExecutor を介して別のプロセスに渡され、メイン プロセス/イテレーターはレコードのフェッチを続けます。ProcessPoolExecutor によって生成されたプロセスは、受信したデータセット内のレコードを変換、フィルター処理します。

特に大規模なデータセットを操作する場合、次の BrokenProcessPool を頻繁に取得し続けます。それをヒント(より大きなデータセット)として、関連するパラメーターを調整すると、このエラーは少し遅れますが、最終的にエラーがスローされます。情報が不足しているため、それは非決定論的なパズルのままであり、いつ/どのポイントで再び発生するかを特定できません。

私は他の質問を調べましたが、それを情報として使用して進歩することはできません.

concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib64/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib64/python3.6/concurrent/futures/process.py", line 295, in _queue_management_worker
    shutdown_worker()
  File "/usr/lib64/python3.6/concurrent/futures/process.py", line 253, in shutdown_worker
    call_queue.put_nowait(None)
  File "/usr/lib64/python3.6/multiprocessing/queues.py", line 129, in put_nowait
    return self.put(obj, False)
  File "/usr/lib64/python3.6/multiprocessing/queues.py", line 83, in put
    raise Full
queue.Full

具体的には、queue.Full エラーが発生する queues.py のコード スニペットを次に示します。

def put(self, obj, block=True, timeout=None):
    assert not self._closed, "Queue {0!r} is closed".format(self)
    if not self._sem.acquire(block, timeout):
        raise Full

このエラーがどのような状況で発生するかは不明ですが、

  1. セマフォの数が不足しているためですか?
  2. キューがいっぱいになるのは、高速なプロデューサーと低速なコンシューマーのシナリオによるものですか? その場合、キューに書き込むプロセスをブロックすることはできますか?
  3. キューの空き容量を知る方法はありますか? 要素をキューに正常に配置する前に、ライターを調整できるようにするには?

決定論的にデバッグする方法についての考えや指針はありますか?

4

0 に答える 0