0

私はやるべき労働者と仕事を持っています:

workers = ['peter', 'paul', 'mary']
tasks = range(13)

ここで、タスクを作業のチャンクまたはバッチに分割して、各ワーカーが 1 つのバッチで作業し、他のワーカーとほぼ同じ量の作業を行うことができるようにします。実生活では、バッチ ジョブをコンピューティング ファームにスケジュールしたいと考えています。バッチ ジョブは並行して実行することになっています。実際のスケジュールと発送は、lsf や grid などの商用グレードのツールによって行われます。

私が期待するもののいくつかの例:

>>> distribute_work(['peter', 'paul', 'mary'], range(3))
[('peter', [0]), ('paul', [1]), ('mary', [2])]
>>> distribute_work(['peter', 'paul', 'mary'], range(6))
[('peter', [0, 3]), ('paul', [1, 4]), ('mary', [2, 5])]
>>> distribute_work(['peter', 'paul', 'mary'], range(5))
[('peter', [0, 3]), ('paul', [1, 4]), ('mary', [2])]

この質問は、こちらこちら、およびこちらの質問と非常によく似ています

違いは、これらの機能が必要な順序または優先順位であるということです。

  1. を使用lenせず、可能であれば内部で長いデータ構造を構築しない
  2. 発電機を受け入れる
  3. リターンジェネレーター
  4. stdlib コンポーネントを可能な限り使用する

要件に関する補足事項:

  • 意図的にディクテーションはありません: 複数のバッチを実行できる同じ名前のワーカー (UNIX ホスト名) があります。ソリューションで dict を使用している場合は、バッチ列挙によってワーカー ルックアップをいつでも実行できるため、問題ありません。
  • 任意の長さ: ワーカーとタスクの両方が 1 以上の任意の長さの iterable になることができます。また、Mary が 1 つのタスクしか取得しない上記の例に示すように、それらは均等に分割する必要はありません。
  • 順序: 私にとっては重要ではありません。[0,1]、[2,3]、[5] のような順序を好む人もいると思いますが、私は気にしません。あなたのソリューションが順序を維持または切り替えることができる場合は、他の人に指摘する価値があるかもしれません.

私は頭を包み込み、itertoolsこの特定の問題を解決しようとしましたが、質問を説明するために次のコードを思いつきました:

from itertools import *

def distribute_work(workers, tasks):
    batches = range(len(workers))
    return [ ( workers[k],
               [t[1] for t in i]
               )   for (k,i) in groupby(sorted(zip(cycle(batches),
                                                   tasks),
                                               key=lambda t: t[0]),
                                        lambda t: t[0]) ]

これは 4. を満たしますが、並べ替えは 1.. および 2./3. に違反する可能性が非常に高くなります。考えられていません。

おそらく、これにはいくつかの簡単な解決策があり、私が考えもしなかった方法でいくつかの stdlib コンポーネントを組み合わせます。しかし、そうではないかもしれません。テイカーはいますか?

4

5 に答える 5

2

multiprocessing.Pool.imap私はあなたがあなたの労働者を扱いそして彼らの仕事を割り当てるために使用したいと思います。私はそれがあなたが望むすべてをすることを信じます。

jobs = (some generator)                   # can consume jobs from a generator
pool = multiprocessing.Pool(3)            # set number of workers here
results = pool.imap(process_job, jobs)    # returns a generator

for r in results:                         # loop will block until results arrive
    do_something(r)

結果の順序がアプリケーションにとって重要でない場合は、を使用することもできますimap_unordered

于 2012-10-31T09:04:16.793 に答える
1

Following Tyler's answer:

def doleOut(queue, workers):
    for worker,task in itertools.izip(itertools.cycle(workers),queue):
        yield worker,task

This will keep returning (worker, task) tuples as long as there's a queue. So if you have a blocking waitForMoreWork you can do this:

queue = []
doler = distribute_work(workers, queue)
while 1:
    queue.append(waitForMoreWork)
    currentqueuelen = len(queue)
    for i in range(0,queuelen):
        worker,item = doler.next()
        worker.passitem(item)

That way it will block until there are more queue items, then distribute those, then block again. You can set up your waitForMoreWork expression to hand out as many items at a time as seems sensible.

于 2012-10-30T19:43:32.947 に答える
1

プレバッチする必要がありますか?

キューを用意して、各ワーカーがワークユニットを終了したときにキューからポップしないのはなぜですか?

于 2012-10-30T17:55:50.453 に答える
0

わかりました、それは不可能だと言った後、ここにアイデアがあります。たぶん、これは codereview に移行する必要があるものです-これがメモリにどれだけのオーバーヘッドをもたらすかについてのコメントに非常に興味があります. つまり、タスク リストが非常に長く、サイズが不明であるという問題が、これで本当に解決されるかどうかはわかりません。Blckknghtが述べmultiprocessingたように、より良い代替手段になる可能性があります

コード:

import itertools

def distribute_work(workers, tasks):
    """Return one generator per worker with a fair share of tasks

    Task may be an arbitrary length generator.
    Workers should be an iterable.
    """
    worker_count = len(workers)
    worker_ids = range(worker_count)
    all_tasks_for_all_workers = itertools.tee(tasks, worker_count)
    assignments = [ (workers[id], itertools.islice(i, id, None, worker_count))
                    for (id,i) in enumerate(all_tasks_for_all_workers) ]    
    return(assignments)

アルゴリズムは、

  1. ワーカーごとに 1 回、元のタスク リストを複製します。これはジェネレータ オブジェクトを複製するだけなので、メモリ内のタスク リストのサイズにとらわれない必要があります。これが比較的コストのかかる操作である場合でも、起動時のコストは 1 回だけであり、非常に大きなタスクリストのメモリでは重要ではありません。
  2. タスクを 1 人のワーカーに割り当てるには、各ワーカーがタスクリストのスライスを取得する必要があります。がワーカーの数である場合#W、最初のワーカーは 、 、 、などのタスクを実行0#Wます2*#W。23*#W番目のワーカーは0+1#W+12*#W+13*#W+1などを実行します。各ワーカーの接合は、itertools.islice

タスクの純粋な分割/割り当ての場合、この関数ではワーカーの名前は実際には必要ありません。しかし、労働者の量はそうです。これを変更すると、関数がより用途が広く便利になり、戻り値が理解しやすくなります。私自身の質問に答えるために、関数をそのままにしておきます。

使用法と結果:

>>> for (worker,tasks) in distribute_work(['peter', 'paul', 'mary'], range(5)):
...   print(worker, list(tasks))
... 
peter [0, 3]
paul [1, 4]
mary [2]

また、ワーカーの名前が同じでエンティティが異なる場合も処理します。

>>> for (worker,tasks) in distribute_work(['p', 'p', 'mary'], range(5)): 
...   print(worker, list(tasks))
... 
p [0, 3]
p [1, 4]
mary [2]
于 2012-10-31T10:24:00.617 に答える