私が持っているファイルの上部にあるpythonスクリプトがあります:
result_queue = Queue.Queue()
key_list = *a large list of small items* #(actually from bucket.list() via boto)
キューはプロセスセーフなデータ構造であることを学びました。私は方法を持っています:
def enqueue_tasks(keys):
for key in keys:
try:
result = perform_scan.delay(key)
result_queue.put(result)
except:
print "failed"
ここでのperform_scan.delay()
関数は実際にセロリ ワーカーを呼び出しますが、関係ないと思います (非同期プロセス呼び出しです)。
私も持っています:
def grouper(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return izip_longest(fillvalue=fillvalue, *args)
最後に、main()
関数があります:
def main():
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
concurrent.futures.wait(futures)
print len(result_queue)
result_queue
print ステートメントの結果は 0 です。しかし、 inのサイズの print ステートメントを含めるとenqueue_tasks
、プログラムの実行中に、サイズが増加し、キューに物が追加されていることがわかります。
何が起こっているのか?