1

私が持っているファイルの上部にあるpythonスクリプトがあります:

result_queue = Queue.Queue()
key_list = *a large list of small items* #(actually from bucket.list() via boto)

キューはプロセスセーフなデータ構造であることを学びました。私は方法を持っています:

def enqueue_tasks(keys):
    for key in keys:
        try:
            result = perform_scan.delay(key)
            result_queue.put(result)
        except:
           print "failed"

ここでのperform_scan.delay()関数は実際にセロリ ワーカーを呼び出しますが、関係ないと思います (非同期プロセス呼び出しです)。

私も持っています:

def grouper(iterable, n, fillvalue=None):
    args = [iter(iterable)] * n
    return izip_longest(fillvalue=fillvalue, *args)

最後に、main()関数があります:

def main():

    executor = concurrent.futures.ProcessPoolExecutor(10)
    futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
    concurrent.futures.wait(futures)
    print len(result_queue)

result_queueprint ステートメントの結果は 0 です。しかし、 inのサイズの print ステートメントを含めるとenqueue_tasks、プログラムの実行中に、サイズが増加し、キューに物が追加されていることがわかります。

何が起こっているのか?

4

2 に答える 2

2

この問題にはもっと簡単な解決策があるようです。

先物のリストを作成しています。先物の要点は、それらが将来の結果であるということです。特に、各関数が返すものは何でも、それが未来の (最終的な) 値です。したがって、「結果をキューにプッシュする」ことをまったく行わず、タスク関数から結果を返し、先物から取得するだけです。


これを行う最も簡単な方法は、そのループを分割して、各キーが個別のタスクであり、個別の未来を持つようにすることです。それが実際のコードに適しているかどうかはわかりませんが、そうであれば:

def do_task(key):
    try:
        return perform_scan.delay(key)
    except:
        print "failed"

def main():
    executor = concurrent.futures.ProcessPoolExecutor(10)
    futures = [executor.submit(do_task, key) for key in key_list]
    # If you want to do anything with these results, you probably want
    # a loop around concurrent.futures.as_completed or similar here,
    # rather than waiting for them all to finish, ignoring the results,
    # and printing the number of them.
    concurrent.futures.wait(futures)
    print len(futures)

もちろん、それはグループ化を行いません。しかし、あなたはそれが必要ですか?

グループ化が必要な理由として最も可能性が高いのは、タスクが非常に小さいため、タスクのスケジューリング (および入力と出力のピクル) のオーバーヘッドが実際の作業を圧倒することです。そうであれば、ほぼ確実に、バッチ全体が完了して結果が返されるまで待つことができます。とにかくすべてが完了するまで、結果を見ていないことを考えると特に。(この「グループに分割し、各グループを処理し、再びマージする」というモデルは、各要素が小さいか、要素が互いに独立していない可能性がある数値作業のような場合に非常に一般的ですが、大きなグループがあります十分に、または残りの作業から独立しています。)

とにかく、それはほとんど同じくらい簡単です:

def do_tasks(keys):
    results = []
    for key in keys:
        try:
            result = perform_scan.delay(key)
            results.append(result)
        except:
           print "failed"
    return results

def main():
    executor = concurrent.futures.ProcessPoolExecutor(10)
    futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
    print sum(len(results) for results in concurrent.futures.as_completed(futures))

または、最初に待機してから計算する場合:

def main():
    executor = concurrent.futures.ProcessPoolExecutor(10)
    futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
    concurrent.futures.wait(futures)
    print sum(len(future.result()) for future in futures)

しかし、繰り返しますが、これさえ必要だとは思いません。

于 2014-10-17T00:16:56.237 に答える