2

ipython 並列マップからの非同期結果が到着したときに、その結​​果を繰り返し処理したいと考えています。これを行う唯一の方法は、結果オブジェクトを反復処理することです。ただし、タスクの 1 つが例外を発生させると、反復は終了します。これを行う方法はありますか?以下のコードを参照してください。2 番目のジョブが例外を発生させると、反復は終了します。

from IPython import parallel

def throw_even(i):
    if i % 2 == 0:
        raise RuntimeError('ERROR: %d' % i)
    return i

rc = parallel.Client()
lview = rc.load_balanced_view() # default load-balanced view

# map onto the engines.
args = range(1, 5)
print args
async_results = lview.map_async(throw_even, range(1, 5), ordered=True)

# get results
args_iter = iter(args)
results_iter = iter(async_results)
while True:
    try:
        arg = args_iter.next()
        result = results_iter.next()
        print 'Job %s completed: %d' % (arg, result)            
    except StopIteration:
        print 'Finished iteration'
        break
    except Exception as e:
        print '%s: Job %d: %s' % (type(e), arg, e)

ジョブ 3 と 4 が報告される前に停止する次の出力が得られます

[1, 2, 3, 4]
Job 1 completed: 1
<class 'IPython.parallel.error.RemoteError'>: Job 2: RuntimeError(ERROR: 2)
Finished iteration

これを行う方法はありますか?

4

2 に答える 2

0

This question might be relevant. I dont't really see why you would want to throw an exception from a remote engine, though. Although, if you do want to do it, I think you can do it in the same way I answered the question mentioned. Which I now see you already realized in your comments, but this should do it anyway.

def throw_even(i):
    if i%2:
       return i
    raise(RuntimeError('Error %d'%i)

params = range(1,5)

n_cores = len(c.ids)
for n,p in enumerate( params ):
    core = c.ids[n%n_cores]
    calls.append( c[core].apply_async( throw_even, p ) )

#then you get the results

while calls != []:
    for c in calls:
        try:
             result = c.get(1e-3)
             print(result[0])
             calls.remove( c )
             #in the case your call failed, you can apply_async again.
             # and append the call to calls.
        except parallel.TimeoutError:
             pass
        except Exception as e:
             knock_yourself_out(e)
于 2013-10-23T12:42:06.560 に答える