7

ファイルから読み込まれた入力データに対して並列計算を実行したいと考えています。(ファイルは非常に大きくなる可能性があるため、これにはジェネレーターを使用します。)

特定の数のアイテムでは、コードは正常に実行されますが、このしきい値を超えるとプログラムがハングします (一部のワーカー プロセスが終了しません)。

助言がありますか?(これを python2.7、8 CPU で実行しています。5,000 行はまだ問題ありませんが、7,500 行は機能しません。)

まず、入力ファイルが必要です。bash で生成します。

for i in {0..10000}; do echo -e "$i"'\r' >> counter.txt; done

次に、これを実行します。

python2.7 main.py 100 counter.txt > run_log.txt

main.py:

#!/usr/bin/python2.7
import os, sys, signal, time
import Queue
import multiprocessing as mp

def eat_queue(job_queue, result_queue):
    """Eats input queue, feeds output queue
    """
    proc_name = mp.current_process().name
    while True:
        try:
            job = job_queue.get(block=False)
            if job == None:
                print(proc_name + " DONE")
                return
            result_queue.put(execute(job))
        except Queue.Empty:
            pass    

def execute(x):
    """Does the computation on the input data
    """
    return x*x

def save_result(result):
    """Saves results in a list
    """
    result_list.append(result)

def load(ifilename):
    """Generator reading the input file and
        yielding it row by row
    """
    ifile = open(ifilename, "r")
    for line in ifile:
        line = line.strip()
        num = int(line)
        yield (num)
    ifile.close()
    print("file closed".upper())

def put_tasks(job_queue, ifilename):
    """Feeds the job queue
    """
    for item in load(ifilename):
        job_queue.put(item)
    for _ in range(get_max_workers()):
        job_queue.put(None)

def get_max_workers():
    """Returns optimal number of processes to run
    """
    max_workers = mp.cpu_count() - 2
    if max_workers < 1:
        return 1
    return max_workers

def run(workers_num, ifilename):
    job_queue = mp.Queue()
    result_queue = mp.Queue()

    # decide how many processes are to be created
    max_workers = get_max_workers()
    print "processes available: %d" % max_workers
    if workers_num < 1 or workers_num > max_workers:
        workers_num = max_workers

    workers_list = []
    # a process for feeding job queue with the input file
    task_gen = mp.Process(target=put_tasks, name="task_gen",
                          args=(job_queue, ifilename))
    workers_list.append(task_gen)

    for i in range(workers_num):
        tmp = mp.Process(target=eat_queue, name="w%d" % (i+1),
                                      args=(job_queue, result_queue))
        workers_list.append(tmp)

    for worker in workers_list:
        worker.start()

    for worker in workers_list:
        worker.join()
        print "worker %s finished!" % worker.name

if __name__ == '__main__':
    result_list = []
    args = sys.argv
    workers_num = int(args[1])
    ifilename = args[2]
    run(workers_num, ifilename)
4

1 に答える 1

8

これは、あなたのコードには何かを取り除く result_queueものがないからです。その後の動作は、内部キューのバッファリングの詳細に依存します。待機しているデータが「それほど多くない」場合、すべて正常に表示されますが、待機しているデータが「大量」の場合、すべてがフリーズします。内部魔法の層が含まれているため、これ以上は言えません;-)しかし、ドキュメントはそれについて警告しています:

警告

前述のように、子プロセスがアイテムをキューに入れている場合 (かつ、JoinableQueue.cancel_join_thread を使用していない場合)、バッファリングされたすべてのアイテムがパイプにフラッシュされるまで、そのプロセスは終了しません。

これは、そのプロセスに参加しようとすると、キューに入れられたすべてのアイテムが消費されたことを確認しない限り、デッドロックが発生する可能性があることを意味します。同様に、子プロセスが非デーモンの場合、親プロセスは、すべての非デーモンの子プロセスに参加しようとすると、終了時にハングすることがあります。

マネージャーを使用して作成されたキューには、この問題がないことに注意してください。プログラミングのガイドラインを参照してください。

それを修復する簡単な方法の 1 つ: 最初に追加します。

            result_queue.put(None)

eat_queue()戻る前に。それから加えて:

count = 0
while count < workers_num:
    if result_queue.get() is None:
        count += 1

メインプログラム.join()の前にワーカーがあります。これにより、結果キューが空になり、すべてが正常にシャットダウンされます。

ところで、このコードはかなり奇妙です:

while True:
    try:
        job = job_queue.get(block=False)
        if job == None:
            print(proc_name + " DONE")
            return
        result_queue.put(execute(job))
    except Queue.Empty:
        pass

なぜノンブロッキングをしているのget()ですか?これは、キューが空である限り、CPU を大量に消費する「ビジー ループ」に変わります。の主なポイントは、作業が現れるのを効率的に待つ方法を.get()提供することです。そう:

while True:
    job = job_queue.get()
    if job is None:
        print(proc_name + " DONE")
        break
    else:
        result_queue.put(execute(job))
result_queue.put(None)

同じことをしますが、はるかに効率的です。

キュー サイズに関する注意事項

あなたはこれについて質問していませんが、噛まれる前に説明しましょう ;-) デフォルトでは、Queueのサイズに制限はありません。たとえば、 に 10 億個のアイテムを追加すると、10 億個のアイテムQueueを保持するのに十分な RAM が必要になります。したがって、コンシューマが処理できるよりも速くプロデューサが作業項目を生成できる場合、メモリの使用量はすぐに手に負えなくなります。

幸いなことに、これは簡単に修復できます。キューの最大サイズを指定してください。例えば、

    job_queue = mp.Queue(maxsize=10*workers_num)
                         ^^^^^^^^^^^^^^^^^^^^^^^

次にjob_queue.put(some_work_item)、コンシューマーがキューのサイズを最大値未満に減らすまでブロックします。このようにして、些細な RAM を必要とするキューで巨大な問題を処理できます。

于 2013-12-13T20:19:09.807 に答える