0

python3 で複数のプロセスを実行する際に問題があります。

私のプログラムは次のことを行います: 1. sqllite データベースからエントリを取得し、input_queue に渡します。3. output_queue からアイテムを取得して出力するスレッドを作成します (このスレッドは明らかに最初の 2 つのステップの前に開始されます)。

私の問題は、現在、ステップ 2 の「関数」が設定されたプロセス数と同じ回数しか実行されないことです。たとえば、プロセス数を 8 に設定すると、8 回しか実行されずに停止します。input_queue からすべての項目が取り除かれるまで実行し続けると想定しました。

データベースからエントリを取り出す関数 (ステップ 1) を別のプロセスに書き直し、その出力キューをステップ 2 の入力キューとして渡す必要がありますか?

編集:コードの例を次に示します。データベースエントリの代わりに数字のリストを使用しましたが、これは同じように実行されます。リストに 300 個のアイテムがあり、300 個のアイテムすべてを処理したいのですが、現時点では 10 (割り当てたプロセスの数) しか処理できません。

#!/usr/bin/python3
from multiprocessing import Process,Queue
import multiprocessing
from threading import Thread


## This is the class that would be passed to the multi_processing function
class Processor:
    def __init__(self,out_queue):
        self.out_queue = out_queue
    def __call__(self,in_queue):
        data_entry = in_queue.get()
        result = data_entry*2
        self.out_queue.put(result)



#Performs the multiprocessing
def perform_distributed_processing(dbList,threads,processor_factory,output_queue):
    input_queue = Queue()


    # Create the Data processors.
    for i in range(threads):
        processor  = processor_factory(output_queue)
        data_proc = Process(target = processor,
                            args   = (input_queue,))

        data_proc.start()

    # Push entries to the queue.

    for entry in dbList:
        input_queue.put(entry)


    # Push stop markers to the queue, one for each thread.

    for i in range(threads):
        input_queue.put(None)

    data_proc.join()
    output_queue.put(None)


if __name__ == '__main__':
    output_results   = Queue()

    def output_results_reader(queue):
        while True:
            item = queue.get()
            if item is None:
                break
            print(item)


    # Establish results collecting thread.
    results_process = Thread(target = output_results_reader,args   = (output_results,))
    results_process.start()

    # Use this as a substitute for the database in the example
    dbList = [i for i in range(300)]

    # Perform multi processing
    perform_distributed_processing(dbList,10,Processor,output_results)

    # Wait for it all to finish.
    results_process.join()
4

2 に答える 2

2

multiprocessing ライブラリ全体を再度書き直そうとしないでください。multiprocessing.Pool必要に応じて任意のメソッドを使用できると思います-これがバッチジョブの場合は、同期を使用することもできますmultiprocessing.Pool.map()-入力キューにプッシュする代わりに、スレッドに入力を生成するジェネレーターを作成する必要があります。

于 2013-08-21T01:51:37.340 に答える