2

長時間実行しているファイル I/O タスクをデーモン/サーバー プロセスに移動できるようにしたいと考えています。CLI ツールを使用して、実行する新しいジョブをキューに入れ、実行中のジョブのステータスを照会し、個々のジョブを待機します。Python のmultiprocessing.managers方法は、IPC を処理する単純な方法のように見えます。クライアントがサーバーをブロックせずに待機できるようにしたいのですSyncManager.Eventが、そうしようとすると「サーバーがまだ開始されていません」というアサーションがトリガーされます。皮肉なことに、このアサーションはサーバーからクライアントに送信されるため、明らかにサーバーどこかで開始されます。

最小限の例を次に示します。

#!/usr/bin/env python3
import time
import sys
import concurrent.futures
from multiprocessing.managers import SyncManager

def do_work(files):
    """Simulate doing some work on a set of files."""
    print(f"Starting work for {files}.")
    time.sleep(2)
    print(f"Finished work for {files}.")

# Thread pool to do work in.
pool = concurrent.futures.ProcessPoolExecutor(max_workers=1)

class Job:
    job_counter = 1

    def __init__(self, files):
        """Setup a job and queue work for files on our thread pool."""
        self._job_number = self.job_counter
        Job.job_counter += 1
        print(f"manager._state.value = {manager._state.value}")
        self._finished_event = manager.Event()

        print(f"Queued job {self.number()}.")
        future = pool.submit(do_work, files)
        future.add_done_callback(lambda f : self._finished_event.set())

    def number(self):
        return self._job_number

    def event(self):
        """Get an event which can be waited on for the job to complete."""
        return self._finished_event

class MyManager(SyncManager):
    pass

MyManager.register("Job", Job)

manager = MyManager(address=("localhost", 16000), authkey=b"qca-authkey")
if len(sys.argv) > 1 and sys.argv[1] == "server":

    manager.start()
    print(f"Manager listening at {manager.address}.")

    while True:
        time.sleep(1)
else:
    manager.connect()
    print(f"Connected to {manager.address}.")

    job = manager.Job(["a", "b", "c"])
    job.event().wait()
    print("Done")

クライアントを実行すると、次のように表示されます。

$ ./mp-manager.py
Connected to ('localhost', 16000).
Traceback (most recent call last):
  File "./mp-manager.py", line 54, in <module>
    job = manager.Job(["a", "b", "c"])
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 740, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 625, in _create
    id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 91, in dispatch
    raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError: 
---------------------------------------------------------------------------
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 210, in handle_request
    result = func(c, *args, **kwds)
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 403, in create
    obj = callable(*args, **kwds)
  File "./mp-manager.py", line 24, in __init__
    self._finished_event = manager.Event()
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 740, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 622, in _create
    assert self._state.value == State.STARTED, 'server not yet started'
AssertionError: server not yet started
---------------------------------------------------------------------------

サーバー出力は次のとおりです。

$ ./mp-manager.py server
Manager listening at ('127.0.0.1', 16000).
manager._state.value = 0
4

0 に答える 0