6

多くのサーバーレットの 1 つに接続している Web サーバーがあります。Web サーバーは、それぞれの実行に 20 分または 30 時間かかる可能性のある最大 40 個のジョブをキューに入れる場合があります。

Web サーバーはソケットを使用してサーバーレットに接続し、サーバーレットはスレッドを使用して送信されたジョブを実行します。

一度に実行できるスレッド (ジョブ) の数を 3 に制限し、その制限に達すると、メイン スレッドが保持されます。スレッドの 1 つが終了すると、メイン スレッドが続行して別のジョブを取得できるようになります。

# Wait for thread count to reduce before continuing
while threading.active_count() >= self.max_threads:
    pass 

私は現在ループを使用して、空きスレッドが利用可能になるまでメインスレッドを待機させています。それは機能しますが、迅速で汚い解決策のように感じます. それを行うためのより良い方法があるのではないかと思いますか?

サーバー.py

import socket
import sys
import urllib, urllib2
import threading
import cPickle

from supply import supply


class supply_thread(threading.Thread):

    def __init__(self, _sock):
        threading.Thread.__init__(self)
        self.__socket = _sock

    def run(self):
        data = self.readline()
        self.__socket.close()
        new_supply = supply.supply(data)
        new_supply.run()

    def readline(self):
        """ read data sent from webserver and decode it """

        data = self.__socket.recv( 1024 )
        if data:
            data = cPickle.loads(data)
            return data



class server:

    def __init__(self):
        ## Socket Vars
        self.__socket = None
        self.HOST = ''
        self.PORT = 50007
        self.name = socket.gethostname()

        self.max_jobs = 3


    def listen(self):
        """ Listen for a connection from the webserver """

        self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # Allows quick connection from the same address
        self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        self.__socket.bind((self.HOST, self.PORT))
        return self.__socket.listen(1)

    def connect(self):
        webserver = self.__socket.accept()[0]
        print 'Connected by', webserver

        new_thread = supply_thread(webserver)
        print 'Starting thread' , new_thread.getName()

        new_thread.start()

    def close(self):
        return self.__socket.close()


    def run(self):
        import time

        while True:
            print(sys.version)

            # Wait for connection from Webserver
            self.listen()

            time.sleep(3)

            # Let the Webserver know I'm avilable
            self.status(status='Available')

            print 'Waiting for connection...'
            self.connect()

            print 'thread count:', threading.enumerate()
            print 'thread count:', threading.active_count()

            while threading.active_count() >= self.max_jobs:
                pass


    def status(self, status='Available'):
        computer_name = socket.gethostname()
        svcURL = "http://localhost:8000/init/default/server"
        params = {
            'computer_name':computer_name,
            'status':status,
            'max_jobs':self.max_jobs
        }
        svcHandle = urllib2.urlopen(svcURL, urllib.urlencode(params))
4

1 に答える 1

5

これはCeleryの良い使用例のように思えます。

基本的には、ファイルに Celery タスクを作成し、tasks.pyそれを で呼び出しますtaskname.delay()。Celery ワーカーにタスクをディスパッチし、ワーカーが別のタスクを受け入れる準備ができている場合に作業を開始します。ドキュメントの例を次に示します。

ドキュメント に従って、Celery はデフォルトでマシンのコア数と同じ同時実行数を持つワーカーを作成します。必要に応じて変更できます。

または、 Queuesを使用できます。それがどのように見えるかの別の例を次に示します。

于 2013-05-10T04:07:04.257 に答える