4

LinuxでDjangoを実行していて、ビューがあり、そのビューが作成するファイルを操作するcmdというサブプロセスからデータを返すようにしたいとします。たとえば、次のようになります。

 def call_subprocess(request):
     response = HttpResponse()

     with tempfile.NamedTemporaryFile("W") as f:
         f.write(request.GET['data']) # i.e. some data

     # cmd operates on fname and returns output
     p = subprocess.Popen(["cmd", f.name], 
                   stdout=subprocess.PIPE, 
                   stderr=subprocess.PIPE)

     out, err = p.communicate()

     response.write(p.out) # would be text/plain...
     return response

ここで、cmdの起動時間は非常に遅いが、動作時間は非常に速く、ネイティブにデーモンモードがないとします。このビューの応答時間を改善したいと思います。

ワーカープールでcmdのインスタンスをいくつか起動し、入力を待機させ、call_processにそれらのワーカープールプロセスの1つにデータの処理を依頼させることで、システム全体の実行速度を大幅に向上させたいと思います。

これは実際には2つの部分です。

パート1。cmdおよびcmdを呼び出す関数は入力を待ちます。これはパイプで行うことができます、すなわち

def _run_subcmd():
    p = subprocess.Popen(["cmd", fname], 
        stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    out, err = p.communicate()
    # write 'out' to a tmp file
    o = open("out.txt", "W")
    o.write(out)
    o.close()
    p.close()
    exit()

def _run_cmd(data):
    f = tempfile.NamedTemporaryFile("W")
    pipe = os.mkfifo(f.name)

    if os.fork() == 0:
        _run_subcmd(fname)
    else:
        f.write(data)

    r = open("out.txt", "r")
    out = r.read()
    # read 'out' from a tmp file
    return out

def call_process(request):
    response = HttpResponse()

    out = _run_cmd(request.GET['data'])

    response.write(out) # would be text/plain...
    return response

パート2。データを待機しているバックグラウンドで実行されている一連のワーカー。つまり、サブプロセスがすでに実行されるように上記を拡張する必要があります。たとえば、Djangoインスタンスが初期化されるとき、またはこのcall_processが最初に呼び出されるときに、これらのワーカーのセットが作成されます。

WORKER_COUNT = 6
WORKERS = []

class Worker(object):
    def __init__(index):
        self.tmp_file = tempfile.NamedTemporaryFile("W") # get a tmp file name
        os.mkfifo(self.tmp_file.name)
        self.p = subprocess.Popen(["cmd", self.tmp_file], 
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.index = index

    def run(out_filename, data):
        WORKERS[self.index] = Null # qua-mutex??
        self.tmp_file.write(data)
        if (os.fork() == 0): # does the child have access to self.p??
            out, err = self.p.communicate()
            o = open(out_filename, "w")
            o.write(out)
            exit()

        self.p.close()
        self.o.close()
        self.tmp_file.close()
        WORKERS[self.index] = Worker(index) # replace this one
        return out_file

    @classmethod
    def get_worker() # get the next worker
    # ... static, incrementing index 

次のように、どこかにワーカーの初期化が必要です。

def init_workers(): # create WORKERS_COUNT workers
    for i in xrange(0, WORKERS_COUNT):
        tmp_file = tempfile.NamedTemporaryFile()
        WORKERS.push(Worker(i))

さて、私が上に持っているものは次のようなものになります:

def _run_cmd(data):
     Worker.get_worker() # this needs to be atomic & lock worker at Worker.index

     fifo = open(tempfile.NamedTemporaryFile("r")) # this stores output of cmd

     Worker.run(fifo.name, data)
     # please ignore the fact that everything will be
     # appended to out.txt ... these will be tmp files, too, but named elsewhere.

     out = fifo.read()
     # read 'out' from a tmp file
     return out


def call_process(request):
     response = HttpResponse()

     out = _run_cmd(request.GET['data'])

     response.write(out) # would be text/plain...
     return response

さて、質問:

  1. これは機能しますか?(これを頭のてっぺんからStackOverflowに入力したので、問題があると確信していますが、概念的には機能します)

  2. 探すべき問題は何ですか?

  3. これに代わるより良い方法はありますか?たとえば、スレッドも同様に機能しますか(Debian Lenny Linuxです)?このような並列プロセスワーカープールを処理するライブラリはありますか?

  4. 意識すべきDjangoとのやり取りはありますか?

読んでくれてありがとう!これが私と同じくらい面白い問題だと思っていただければ幸いです。

ブライアン

4

3 に答える 3

3

Issy はすでに Celery について言及しましたが、コード サンプルではコメントがうまく機能しないため、代わりに回答として返信します。

セロリを AMQP 結果ストアと同期して使用するようにしてください。実際の実行を別のプロセスまたは別のマシンに分散することもできます。セロリで同期的に実行するのは簡単です。例:

>>> from celery.task import Task
>>> from celery.registry import tasks

>>> class MyTask(Task):
...
...     def run(self, x, y):
...         return x * y 
>>> tasks.register(MyTask)

>>> async_result = MyTask.delay(2, 2)
>>> retval = async_result.get() # Now synchronous
>>> retval 4

AMQP 結果ストアにより、結果の返送が非常に高速になりますが、現在の開発バージョンでのみ利用可能です (0.8.0 になるようにコードが凍結されています)。

于 2009-09-18T17:05:58.347 に答える
0

python-daemonまたはその後継であるgrizzledを使用してサブプロセス呼び出しを「デーモン化」するのはどうですか。

于 2009-09-16T00:15:06.613 に答える