2

私はこの次のコードを持っています。

これは decorator と呼ばれる Python モジュールを使用します。

from multiprocessing import Pool
from random import randint
import traceback
import decorator
import time


def test_retry(number_of_retry_attempts=1, **kwargs):
    timeout = kwargs.get('timeout', 2.0) # seconds
    @decorator.decorator
    def tryIt(func, *fargs, **fkwargs):
        for _ in xrange(number_of_retry_attempts):
            try: return func(*fargs, **fkwargs)
            except:
                tb = traceback.format_exc()
                if timeout is not None:
                    time.sleep(timeout)
                print 'Catching exception %s. Attempting retry: '%(tb)

        raise
    return tryIt

デコレータ モジュールは、datawarhouse 呼び出し関数を装飾するのに役立ちます。したがって、接続のドロップやさまざまな接続ベースの問題に対処する必要はなく、接続をリセットしてタイムアウト後に再試行できます。データ ウェアハウスの読み取りを行うすべての関数をこのメソッドでデコレートするので、無料で再試行できます。

私は次の方法を持っています。

def process_generator(data):
    #Process the generated data


def generator():
    data = data_warhouse_fetch_method()#This is the actual method which needs retry
    yield data

@test_retry(number_of_retry_attempts=2,timeout=1.0)
def data_warhouse_fetch_method():
    #Fetch the data from data-warehouse
    pass

このようなマルチプロセッシングモジュールを使用して、コードをマルチプロセスしようとしています。

try:
    pool = Pool(processes=2)
    result = pool.imap_unordered(process_generator,generator())
except Exception as exception:
    print 'Do some post processing stuff'
    tb = traceback.format_exc()
    print tb 

すべてがうまくいっているときは正常です。また、リトライ回数以内で直れば正常です。しかし、rety の数を超えると、メイン プロセスでキャッチされない test_retry メソッドで例外が発生します。プロセスは終了し、メイン プロセスによってフォークされたプロセスは孤立したままになります。ここで何か間違ったことをしているのかもしれません。次の問題を解決するための助けを探しています。例外を親プロセスに伝播して、例外を処理し、子供たちを優雅に死なせるようにします。また、子プロセスに正常に終了するように通知する方法を知りたいです。助けてくれてありがとう。

編集:説明するコードを追加しました。

def test_retry(number_of_retry_attempts=1, **kwargs):
    timeout = kwargs.get('timeout', 2.0) # seconds
    @decorator.decorator
    def tryIt(func, *fargs, **fkwargs):
        for _ in xrange(number_of_retry_attempts):
            try: return func(*fargs, **fkwargs)
            except:
                tb = traceback.format_exc()
                if timeout is not None:
                    time.sleep(timeout)
                print 'Catching exception %s. Attempting retry: '%(tb)
        raise
    return tryIt

@test_retry(number_of_retry_attempts=2,timeout=1.0)
def bad_method():
    sample_list =[]
    return sample_list[0] #This will result in an exception


def process_generator(number):
    if isinstance(number,int):
        return number+1
    else:
        raise

def generator():
    for i in range(20):
        if i%10 == 0 :
         yield bad_method()
        else:
            yield i

try:
    pool = Pool(processes=2)
    result = pool.imap_unordered(process_generator,generator())
    pool.close()
    #pool.join()
    for r in result:
        print r
except Exception, e: #Hoping the generator will catch the exception. But not .
    print 'got exception: %r, terminating the pool' % (e,)
    pool.terminate()
    print 'pool is terminated'
finally:
    print 'joining pool processes'
    pool.join()
    print 'join complete'
print 'the end'

実際の問題は、ジェネレーターが例外をスローしている場合に発生します。pool.imap_unordered() メソッドにラップされている except 句で、ジェネレーターによってスローされた例外をキャッチできません。したがって、例外がスローされた後、メインプロセスはスタックし、子プロセスは永遠に待機します。ここで何が間違っているのかわかりません。

4

1 に答える 1