14

私は1つのプロデューサーと複数のコンシューマーでサーバープログラムを作成しています。私を混乱させるのは、キューに入れられた最初のタスクプロデューサーだけが消費され、その後、キューに入れられたタスクは消費されなくなり、永久にキューに残ります。

from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time

def work(queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(5)
        print "task done:", task
    queue.put(None)

class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        self.workers = [Process(target=work, args=(self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        httpserv(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        queue.close()

Manager().start()

プロデューサーは、ユーザーからの要求を受信するとタスクをキューに入れるHTTPサーバーです。キューに新しいタスクがある場合、コンシューマープロセスはまだブロックされているようですが、これは奇妙なことです。

PS上記に関係のない別の2つの質問ですが、HTTPサーバーをメインプロセス以外の独自のプロセスに配置する方がよいかどうかわかりません。そうであれば、すべての子プロセスが終了する前にメインプロセスを実行し続けるにはどうすればよいですか。2番目の質問、HTTPサーバーを適切に停止するための最良の方法は何ですか?

編集:プロデューサーコードを追加します。これは単純なPythonWSGIサーバーです。

import fapws._evwsgi as evwsgi
from fapws import base

def httpserv(queue):
    evwsgi.start("0.0.0.0", 8080)
    evwsgi.set_base_module(base)

    def request_1(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_1')
        return ["request 1!"]

    def request_2(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_2')
        return ["request 2!!"]

    evwsgi.wsgi_cb(("/request_1", request_1))
    evwsgi.wsgi_cb(("/request_2", request_2))

    evwsgi.run()
4

3 に答える 3

11

これは完全に機能するため、Webサーバーの部分に何か問題があるに違いないと思います。

from multiprocessing import Process, Queue, cpu_count
import random
import time


def serve(queue):
    works = ["task_1", "task_2"]
    while True:
        time.sleep(0.01)
        queue.put(random.choice(works))


def work(id, queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(0.05)
        print "%d task:" % id, task
    queue.put(None)


class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        print "starting %d workers" % self.NUMBER_OF_PROCESSES
        self.workers = [Process(target=work, args=(i, self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        serve(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        self.queue.close()


Manager().start()

サンプル出力:

starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1
于 2009-05-27T13:08:41.067 に答える
4

「2 番目の質問です。HTTP サーバーを適切に停止する最善の方法は何ですか?」

これは難しい。

プロセス間通信には 2 つの選択肢があります。

  • 帯域外コントロール。サーバーには、別の通信メカニズムがあります。別のソケット、Unix Signal、またはその他のもの。他の何かは、サーバーのローカル ディレクトリにある "stop-now" ファイルである可能性があります。奇妙に思えますが、うまく機能し、選択ループを導入して複数のソケットをリッスンしたり、シグナル ハンドラーを導入して Unis シグナルをキャッチしたりするよりも簡単です。

    「今すぐ停止」ファイルは簡単に実装できます。ループはevwsgi.run()、各要求の後にこのファイルをチェックするだけです。サーバーを停止するには、ファイルを作成し、リクエストを実行します/control(500 エラーか何かが発生しますが、実際には問題ではありません)。サーバーは停止します。stop-now ファイルを忘れずに削除してください。そうしないと、サーバーが再起動しません。

  • インバンド コントロール。サーバーには、停止する別の URL ( /stop) があります。表面的には、これはセキュリティ上の悪夢のように見えますが、このサーバーがどこでどのように使用されるかに完全に依存しています。内部リクエスト キューの単純なラッパーのように見えるため、この追加の URL はうまく機能します。

    これを機能させるevwsgi.run()には、ループから抜け出す方法でいくつかの変数を設定することによって終了できる独自のバージョンを作成する必要があります。

編集

サーバーのワーカースレッドの状態がわからないため、おそらくサーバーを終了したくないでしょう。サーバーにシグナルを送信する必要があり、サーバーが正常に終了するまで待つ必要があります。

サーバーを強制的に強制終了したい場合は、os.kill()(またはmultiprocessing.terminate) が機能します。もちろん、子スレッドが何をしていたのかはわかりません。

于 2009-05-27T14:16:14.987 に答える
1

これが役立ちます: http://www.rsdcbabu.com/2011/02/multiprocessing-with-python.html

于 2011-02-27T10:18:38.230 に答える