4

私は多くの個別のタスクをこなす仕事をしています。タスクごとに、データをダウンロードして処理し、再度アップロードする必要があります。

処理にマルチプロセッシング プールを使用しています。

よくわからない問題がいくつかあります。

まず、データはおおよそ20MBまで可能です。理想的には、メモリ内で物理的に移動せずに子ワーカープロセスに取得し、結果のデータを移動せずに親プロセスに戻します。一部のツールがフードの下でどのように機能しているかわからないので、プールの引数としてデータを渡すことができるかどうかはわかりませんapply_async(私の理解では、オブジェクトをシリアル化し、到達したら再び作成されます)サブプロセス?)、またはマルチプロセッシングを使用する必要があるかどうQueuemmap? または、他の何か?

私はctypes オブジェクトを見ましたが、プロセスフォークを共有できるときにプールが作成されたときに定義されたオブジェクトのみを理解していますか? 共有する必要のある新しいデータが継続的に入ってくるので、これは私にとっては良くありません。

心配する必要のないことの 1 つは、データへの同時アクセスであるため、どのような種類のロックも必要ありません。これは、データがダウンロードされてから処理が開始され、アップロードも出力データが生成されてから開始されるためです。

私が抱えているもう 1 つの問題は、受信するタスクが急増することがあり、その結果、子プロセスが処理できるよりも速くタスクのデータをダウンロードしていることです。そのため、タスクを完了してデータを破棄するよりも早くデータをダウンロードしていますが、Python はメモリ不足で死んでいます。メモリがほぼいっぱいになったとき、またはジョブ パイプラインのデータが多すぎるときに、ダウンロード段階でタスクを遅らせるにはどうすればよいでしょうか? データバイト数を使用してある種の「ref」カウントを考えていたので、ダウンロードとアップロードの間のデータ量を制限し、数値がしきい値を下回ったときにのみダウンロードできます。子供が時々失敗するのではないかと心配していましたが、子供が持っていたデータをカウントから除外することはできませんでした. このようなことを達成する良い方法はありますか?

4

3 に答える 3

2

(This is an outcome of the discussion of my previous answer)

Have you tried POSH

This example shows that one can append elements to a mutable list, which is probably what you want (copied from documentation):

import posh

l = posh.share(range(3))
if posh.fork():
    #parent process
    l.append(3)
    posh.waitall()
else:
    # child process
    l.append(4)
    posh.exit(0)
print l

-- Output --
[0, 1, 2, 3, 4]
  -- OR --
[0, 1, 2, 4, 3]
于 2012-11-16T17:20:45.027 に答える
2

マルチプロセッシングのドキュメントからの正規の例を次に示します。

from multiprocessing import プロセス、値、配列

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]

numarrは共有オブジェクトであることに注意してください。それはあなたが探しているものですか?

于 2012-11-15T12:04:39.900 に答える
0

とにかく自分でこれを理解する必要があるので、これをまとめました。マルチプロセッシングやスレッド化に関しては、私は決して熟達しているわけではありませんが、少なくとも機能します。Arrayもっと賢い方法でできるかもしれませんが、非生タイプに付属のロックの使い方がわかりませんでした。誰かがコメントで改善を提案するかもしれません。

from multiprocessing import Process, Event
from multiprocessing.sharedctypes import RawArray

def modify(s, task_event, result_event):
    for i in range(4):
        print "Worker: waiting for task"
        task_event.wait()
        task_event.clear()
        print "Worker: got task"
        s.value = s.value.upper()

        result_event.set()

if __name__ == '__main__':
    data_list = ("Data", "More data", "oh look, data!", "Captain Pickard")
    task_event = Event()
    result_event = Event()
    s = RawArray('c', "X" * max(map(len, data_list)))
    p = Process(target=modify, args=(s, task_event, result_event))
    p.start()
    for data in data_list:
        s.value = data
        task_event.set()
        print "Sent new task. Waiting for results."
        result_event.wait()
        result_event.clear()
        print "Got result: {0}".format(s.value)
    p.join()

この例でdata_listは、 が事前に定義されていますが、必須ではありません。そのリストから必要な情報は、最長の文字列の長さだけでした。長さに実際的な上限がある限り、問題ありません。

プログラムの出力は次のとおりです。

新しいタスクを送信しました。結果待ち。
ワーカー: タスクを待機中
ワーカー: タスクを取得しました
ワーカー: タスクを待機中
得られた結果: DATA
新しいタスクを送信しました。結果待ち。
ワーカー: タスクを取得しました
ワーカー: タスクを待機中
得られた結果: MORE DATA
新しいタスクを送信しました。結果待ち。
ワーカー: タスクを取得しました
ワーカー: タスクを待機中
得られた結果: 見て、データ!
新しいタスクを送信しました。結果待ち。
ワーカー: タスクを取得しました
得た結果: キャプテン・ピッカード

ご覧のように、btel は実際に解決策を提供しましたが、問題は 2 つのプロセスを互いに足並みをそろえておくことでした。そのため、ワーカーはタスクの準備が整ったときにのみ新しいタスクの作業を開始し、メインのプロセスはプロセスは、完了する前に結果を読み取りません。

于 2012-11-15T13:00:14.243 に答える