11

プロセスのプールを使用して多数のジョブを実行し、特定のタイムアウトを適用した後、ジョブを強制終了して、次のタスクに取り組んでいる別のジョブに置き換える必要があります。

multiprocessingワーカーのプールを非同期で実行する方法を提供するモジュールを使用しようとしましたが(例: を使用map_async)、「グローバル」タイムアウトしか設定できず、その後すべてのプロセスが強制終了されます。

時間がかかりすぎる単一のプロセスのみが強制終了され、代わりに新しいワーカーがプールに再度追加される (次のタスクを処理し、タイムアウトしたタスクをスキップする)個別のタイムアウトを設定することは可能ですか?

私の問題を説明する簡単な例を次に示します。

def Check(n):
  import time
  if n % 2 == 0: # select some (arbitrary) subset of processes
    print "%d timeout" % n
    while 1:
      # loop forever to simulate some process getting stuck
      pass
  print "%d done" % n
  return 0

from multiprocessing import Pool
pool = Pool(processes=4)
result = pool.map_async(Check, range(10))
print result.get(timeout=1)    

タイムアウト後、すべてのワーカーが強制終了され、プログラムが終了します。代わりに、次のサブタスクに進みたいと思います。この動作を自分で実装する必要がありますか、それとも既存のソリューションがありますか?

アップデート

ぶら下がっている労働者を殺すことができ、それらは自動的に置き換えられます。だから私はこのコードを思いついた:

jobs = pool.map_async(Check, range(10))
while 1:
  try:
    print "Waiting for result"
    result = jobs.get(timeout=1)
    break # all clear
  except multiprocessing.TimeoutError: 
    # kill all processes
    for c in multiprocessing.active_children():
      c.terminate()
print result

ここでの問題は、ループが終了しないことです。すべてのタスクが処理された後でも、呼び出すgetとタイムアウト例外が発生します。

4

3 に答える 3

10

Pebble Pool モジュールは、この種の問題を解決するために構築されました。特定のタスクのタイムアウトをサポートし、それらを検出して簡単に回復できるようにします。

from pebble import ProcessPool
from concurrent.futures import TimeoutError

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1,2], timeout=5)

try:
    result = future.result()
except TimeoutError:
    print "Function took longer than %d seconds" % error.args[1]

あなたの特定の例について:

from pebble import ProcessPool
from concurrent.futures import TimeoutError

results = []

with ProcessPool(max_workers=4) as pool:
    future = pool.map(Check, range(10), timeout=5)

    iterator = future.result()

    # iterate over all results, if a computation timed out
    # print it and continue to the next result
    while True:
        try:
            result = next(iterator)
            results.append(result)
        except StopIteration:
            break  
        except TimeoutError as error:
            print "function took longer than %d seconds" % error.args[1] 

print results
于 2015-07-02T13:16:07.263 に答える
3

現在、Python は、ワーカー自体の外部にあるプール内の個別の各タスクの実行時間を制御するためのネイティブな手段を提供していません。
したがって、簡単な方法はwait_procspsutilモジュールで使用し、タスクをサブプロセスとして実装することです。
非標準のライブラリが望ましくない場合は、メイン プロセスのワーキング サイクルを持つサブプロセスpoll()モジュールに基づいて独自のプールを実装する必要があります。つまり、各ワーカーの実行と必要なアクションの実行です。

更新された問題に関しては、ワーカーの 1 つを直接終了すると、プールが破損します (このような動作は許可されるべきではないため、インタープリター実装のバグです): ワーカーは再作成されますが、タスクは失われ、プールは失われます。結合不能になります。すべてのプールを終了してから、別のタスクのために再度作成する必要があります。

from multiprocessing import Pool
while True:
    pool = Pool(processes=4)
    jobs = pool.map_async(Check, range(10))
    print "Waiting for result"
    try:
        result = jobs.get(timeout=1)
        break # all clear
    except multiprocessing.TimeoutError: 
        # kill all processes
        pool.terminate()
        pool.join()
print result    

アップデート

Pebbleは、この問題を解決する優れた便利なライブラリです。Pebbleは Python 関数の非同期実行用に設計されており、PyExPoolはモジュールと外部実行可能ファイルの非同期実行用に設計されていますが、どちらも同じ意味で使用できます。

もう 1 つの側面は、サードパーティの依存関係が望ましくない場合、PyExPoolが適切な選択になる可能性があります。これは、ジョブごとおよびグローバルなタイムアウト、ジョブをタスクにグループ化する機会、およびその他の機能を備えたマルチプロセス実行プールの単一ファイル軽量実装です。 .
PyExPoolは、ソースに埋め込んでカスタマイズすることができ、寛大なApache 2.0 ライセンスと製品品質を持ち、1 つの高負荷の科学的ベンチマーク フレームワークのコアで使用されます。

于 2015-05-05T13:22:45.390 に答える
0

各プロセスが別のスレッドでタイムアウトを使用して結合される構成を試してください。そのため、メイン プログラムがスタックすることはなく、プロセスがスタックすると、タイムアウトにより強制終了されます。この手法は、スレッド化モジュールとマルチプロセッシング モジュールを組み合わせたものです。

これは、メモリ内のスレッドの最小 x 数を維持する私の方法です。スレッド化モジュールとマルチプロセッシング モジュールの組み合わせです。尊敬されている仲間のメンバーが上で説明したような他のテクニックには珍しいかもしれませんが、かなりの価値があるかもしれません. 説明のために、一度に最低 5 つの Web サイトをクロールするシナリオを取り上げます。

だからここにあります:-

#importing dependencies.
from multiprocessing import Process
from threading import Thread
import threading

# Crawler function
def crawler(domain):
    # define crawler technique here.
    output.write(scrapeddata + "\n")
    pass

次はthreadController関数です。この関数は、メイン メモリへのスレッドの流れを制御します。threadNum の「最小」制限を維持するためにスレッドをアクティブ化し続けます。5. また、すべてのアクティブなスレッド (acitveCount) が終了するまで終了しません。

最小の threadNum(5) startProcess 関数スレッドを維持します (これらのスレッドは、60 秒のタイムアウトでプロセスに参加しながら、最終的に processList からプロセスを開始します)。threadController を開始すると、上記の制限 5 に含まれない 2 つのスレッドが存在します。Main スレッドと threadController スレッド自体。それが threading.activeCount() != 2 が使用された理由です。

def threadController():
    print "Thread count before child thread starts is:-", threading.activeCount(), len(processList)
    # staring first thread. This will make the activeCount=3
    Thread(target = startProcess).start()
    # loop while thread List is not empty OR active threads have not finished up.
    while len(processList) != 0 or threading.activeCount() != 2:
        if (threading.activeCount() < (threadNum + 2) and # if count of active threads are less than the Minimum AND
            len(processList) != 0):                            # processList is not empty
                Thread(target = startProcess).start()         # This line would start startThreads function as a seperate thread **

startProcess 関数は、別のスレッドとして、プロセスリストからプロセスを開始します。この関数 (**別のスレッドとして開始) の目的は、プロセスの親スレッドになることです。したがって、60 秒のタイムアウトでそれらに参加すると、startProcess スレッドが先に進むのを停止しますが、threadController の実行は停止しません。このように、threadController は必要に応じて機能します。

def startProcess():
    pr = processList.pop(0)
    pr.start()
    pr.join(60.00) # joining the thread with time out of 60 seconds as a float.

if __name__ == '__main__':
    # a file holding a list of domains
    domains = open("Domains.txt", "r").read().split("\n")
    output = open("test.txt", "a")
    processList = [] # thread list
    threadNum = 5 # number of thread initiated processes to be run at one time

    # making process List
    for r in range(0, len(domains), 1):
        domain = domains[r].strip()
        p = Process(target = crawler, args = (domain,))
        processList.append(p) # making a list of performer threads.

    # starting the threadController as a seperate thread.
    mt = Thread(target = threadController)
    mt.start()
    mt.join() # won't let go next until threadController thread finishes.

    output.close()
    print "Done"

メモリ内のスレッドの最小数を維持することに加えて、私の目的は、メモリ内のスタック スレッドやプロセスを回避できるものを用意することでもありました。タイムアウト機能を使用してこれを行いました。入力ミスについてお詫び申し上げます。

この建設がこの世界の誰かに役立つことを願っています。

よろしく、

ヴィカス・ゴータム

于 2015-09-06T16:46:05.157 に答える