7

Queueを使用してアイテムのリストを段階的に処理できるようにするための確実な実装を探しています。

アイデアは、20以上のデータベース集約型タスクのリストを調べて結果を返す、設定された数のワーカーを使用したいということです。Pythonが最初の5つの項目から開始し、1つのタスクが完了するとすぐに、キュー内の次のタスクから開始するようにします。

これは私が現在スレッドなしでそれをしている方法です。

for key, v in self.sources.iteritems():
    # Do Stuff

同様のアプローチをとりたいのですが、リストを5つのサブグループに分割する必要はないかもしれません。リスト内の次のアイテムを自動的に取得するようにします。目標は、1つのデータベースがプロセスの速度を低下させている場合でも、アプリケーション全体に悪影響を及ぼさないようにすることです。

4

3 に答える 3

5

これは自分で実装できますが、Python 3 にはスレッド管理用の ベースのソリューションが既に付属しており、バックポートされたバージョンExecutorをインストールすることで Python 2.x で使用できます。

コードは次のようになります

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_key = {}
    for key, value in sources.items():
        future_to_idday[executor.submit(do_stuff, value)] = key
    for future in concurrent.futures.as_completed(future_to_key):
        key = future_to_key[future]
        result = future.result()
        # process result
于 2013-01-22T13:45:09.623 に答える
3

python3 を使用している場合は、concurrent futures モジュールをお勧めします。python3 を使用しておらず、(プロセスに対して) スレッドに接続されていない場合は、 multiprocessing.Pool を試すことができます (ただし、いくつかの注意事項があり、アプリケーションでプールが適切に閉じられないという問題がありました)。スレッドを使用する必要がある場合、python2 では、コードを自分で作成することになる可能性があります。消費者関数を実行する 5 つのスレッドを生成し、呼び出し (関数 + 引数) をキューに繰り返しプッシュして、消費者がそれらを見つけて処理できるようにするだけです。

于 2013-01-22T13:49:18.513 に答える
1

stdlib のみを使用して実行できます。

#!/usr/bin/env python
from multiprocessing.dummy import Pool # use threads

def db_task(key_value):
    try:
        key, value = key_value
        # compute result..
        return result, None
    except Exception as e:
        return None, e

def main():
    pool = Pool(5)
    for result, error in pool.imap_unordered(db_task, sources.items()):
        if error is None:
            print(result)

if __name__=="__main__":
    main()
于 2013-01-22T14:52:29.583 に答える