14

多くのワーカーが特定のデータを処理し、結果を親プロセスに返すアルゴリズムに python マルチプロセッシング ライブラリを使用します。ワーカーにジョブを渡すために multiprocessing.Queue を使用し、次に結果を収集します。

ワーカーがデータの一部のチャンクの処理に失敗するまで、すべてうまく機能します。以下の単純化された例では、各ワーカーに 2 つのフェーズがあります。

  • 初期化 - 失敗する可能性があります。この場合、ワーカーを破棄する必要があります
  • データ処理 - データのチャンクの処理が失敗する可能性があります。この場合、ワーカーはこのチャンクをスキップして次のデータを続行する必要があります。

このフェーズのいずれかが失敗すると、スクリプトの完了後にデッドロックが発生します。このコードは私の問題をシミュレートします:

import multiprocessing as mp
import random

workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.2
fail_job_p = 0.3


#========= Worker =========
def do_work(job_state, arg):
    if random.random() < fail_job_p:
        raise Exception("Job failed")
    return "job %d processed %d" % (job_state, arg)

def init(args):
    if random.random() < fail_init_p:
        raise Exception("Worker init failed")
    return args

def worker_function(args, jobs_queue, result_queue):
    # INIT
    # What to do when init() fails?
    try:
        state = init(args)
    except:
        print "!Worker %d init fail" % args
        return
    # DO WORK
    # Process data in the jobs queue
    for job in iter(jobs_queue.get, None):
        try:
            # Can throw an exception!
            result = do_work(state, job)
            result_queue.put(result)
        except:
            print "!Job %d failed, skip..." % job
        finally:
            jobs_queue.task_done()
    # Telling that we are done with processing stop token
    jobs_queue.task_done()



#========= Parent =========
jobs = mp.JoinableQueue()
results = mp.Queue()
for i in range(workers_count):
    mp.Process(target=worker_function, args=(i, jobs, results)).start()

# Populate jobs queue
results_to_expect = 0
for j in range(30):
    jobs.put(j)
    results_to_expect += 1

# Collecting the results
# What if some workers failed to process the job and we have
# less results than expected
for r in range(results_to_expect):
    result = results.get()
    print result

#Signal all workers to finish
for i in range(workers_count):
    jobs.put(None)

#Wait for them to finish
jobs.join()

このコードについて 2 つの質問があります。

  1. 失敗したinit()場合、ワーカーが無効であることを検出し、終了するのを待たないようにする方法は?
  2. 失敗したdo_work()場合、結果キューで予想される結果が少なくなることを親プロセスに通知する方法は?

ご協力ありがとう御座います!

4

1 に答える 1