0

一度に最大n個のprocesses.Popenを生成し、それらが実行されるのを待つ基本的なスケジューリングを実装しようとしています。このために私はやった:

CHECKING_INTERVAL = 10

class StandAloneClient(object):
    """
    Instead of communicating with a backend cluster, fire locally a new thread.
    """
    max_thread_nr = 4
    thread_pool = []
    pending_operations = Queue.Queue(0)

    class OperationExecutor(threading.Thread):

        def run(self):
            """
            Get the required data from the operation queue and launch the operation.
            """
            while True:
                launch_data = StandAloneClient.pending_operations.get()
                if launch_data != None:
                    operation_id = launch_data[0]
                    user_name_label = launch_data[1]
                    LOGGER.debug("Launching operation " + str(operation_id) + " with name " + str(user_name_label))
                    ## Create a new process for the new launched operation
                    oper = ['python', '-m', 'processRunner', str(operation_id), user_name_label]
                    launched_process = Popen(oper, stdout=PIPE, stdin=PIPE, stderr=PIPE)
#                    launched_process.wait()
#                    while launched_process.poll() is None:
#                        sleep(CHECKING_INTERVAL)
#                        LOGGER.debug("Operation id=%s is still running. Going to sleep for %s seconds."%(operation_id,
#                                                                                                         CHECKING_INTERVAL))
                    LOGGER.debug("===========================================================")
                    LOGGER.debug("Finished operation %s succesfully."%(operation_id,))


    def __init__(self):
        """
        If there are still empty spots create a new executor thread.
        """
        for _ in xrange(self.max_thread_nr - len(self.thread_pool)):
            new_executor = StandAloneClient.OperationExecutor()
            self.thread_pool.append(new_executor)
            new_executor.start()

    @staticmethod
    def execute(operation_id, user_name_label="Unknown"):
        """Start asynchronous operation locally"""
        StandAloneClient.pending_operations.put((operation_id, user_name_label))

次の方法で操作をキューに追加しています。

StandAloneClient().execute(...)

今、何らかの理由でスレッドをブロックする可能性があると思った部分にコメントしました。しかし、そうは言っても、産まれた子供たちは決して終わらないようです. 実行は最後まで行われ、ログを確認し、最後まで実行processRunner.pyする必要があるすべてのことを行いますが、実行すると、ps -el|grep pythonすべてのプロセスが生成されます。

    0 S  1000   755     1  5  80   0 - 548314 poll_s pts/0   00:00:13 python
0 S  1000  1198   755  4  80   0 - 280172 futex_ pts/0   00:00:09 python
0 S  1000  1201   755  4  80   0 - 280176 futex_ pts/0   00:00:09 python
0 S  1000  1206   755  4  80   0 - 280230 futex_ pts/0   00:00:09 python
0 S  1000  1215   755  4  80   0 - 280198 futex_ pts/0   00:00:09 python
0 S  1000  1216   755  4  80   0 - 281669 futex_ pts/0   00:00:09 python
0 S  1000  1221   755  4  80   0 - 280201 futex_ pts/0   00:00:09 python
0 S  1000  1231   755  4  80   0 - 281668 futex_ pts/0   00:00:09 python
0 S  1000  1240   755  4  80   0 - 280229 futex_ pts/0   00:00:09 python
0 S  1000  1257   755  4  80   0 - 280201 futex_ pts/0   00:00:09 python

Python 2.7.2を使用して、fedora 16マシンでこれを試しています。ご提案 >

よろしく、 ボグダン

4

1 に答える 1

0

あなたのスレッドはデーモンではありません。メインスレッドが終了した後も存続します。pending_operations.get()彼らは永遠にブロックされています。

于 2012-07-19T13:33:49.873 に答える