1

私は、プール割り当てのためのPythonでのこの動作に気づきました。プールに20個のプロセスがありますが、たとえば8個のプロセスに対してmap_asyncを実行すると、すべてのプロセスをスローして実行するのではなく、4個しか実行されません。これらの4つが終了すると、さらに2つが送信され、2つが終了すると1つが送信されます。

20を超える値をスローすると、キューで20未満になり始めるまで、20をすべて実行し、上記の動作が繰り返されます。

これは意図的に行われていると思いますが、奇妙に見えます。私の目標は、リクエストが届いたらすぐに処理することですが、明らかにこの動作は適合しません。

maxtasksperchildサポートのためのビリヤードでのPython2.6の使用

どうすればそれを改善できますか?

コード:

mypool = pool.Pool(processes=settings['num-processes'], initializer=StartChild, maxtasksperchild=10)

while True:
    lines = DbData.GetAll()
    if len(lines) > 0:
        print 'Starting to process: ', len(lines), ' urls'
        Res = mypool.map_async(RunChild, lines)
        Returns = Res.get(None)
        print 'Pool returns: ', idx, Returns
    else:
        time.sleep(0.5)
4

1 に答える 1

3

Pythonでマルチプロセッシングを処理する1つの方法は、次のとおりです。

関数を使用したいデータがありますfunction()
まず、マルチプロセッシングサブクラスを作成します。

import multiprocessing

class ProcessThread(multiprocessing.Process):
    def __init__(self, id_t, inputqueue, idqueue, function, resultqueue):
        self.id_t = id_t
        self.inputlist = inputqueue
        self.idqueue = idqueue
        self.function = function
        self.resultqueue = resultqueue

        multiprocessing.Process.__init__(self)

    def run(self):
        s = "process number: " + str(self.id_t) + " starting"
        print s
        result = []

        while self.inputqueue.qsize() > 0
            try:
                inp = self.inputqueue.get()
            except Exception:
                pass
            result = self.function(inp)
            while 1:
               try:
                   self.resultqueue.put([self.id,])
               except Exception:
                   pass
               else:
                   break
            self.idqueue.put(id)
            return

および主な機能:

inputqueue = multiprocessing.Queue()
resultqueue = multiprocessing.Queue()
idqueue = multiprocessing.Queue()

def function(data):
    print data # or what you want

for datum in data:
    inputqueue.put(datum)

for i in xrange(nbprocess):
    ProcessThread(i, inputqueue, idqueue, function, resultqueue).start()

そして最後に結果を得る:

results = []
while idqueue.qsize() < nbprocess:
    pass
while resultqueue.qsize() > 0:
    results.append(resultqueue.get())

このようにして、プロセスやその他のものに追加されるものを完全に制御できます。マルチプロセッシングの使用inputqueueは、キューへの異なるプロセスの同時アクセスのために、各データの計算が非常に遅い(<1.2秒)場合にのみ効率的な手法です(そのため、例外を使用します)。関数の計算が非常に速い場合は、最初にデータを1回だけ分割し、最初にすべてのプロセスのデータセットのチャンクを配置することを検討してください。

于 2012-04-27T16:43:12.610 に答える