1

ipythonの並列処理を使ってデータを並列処理しようとしています。ipython並列処理で中間結果を取得する方法に関する質問への回答として、@minrkの指示に従っていますか? . データが異種であるため、一部の処理タスクは他のタスクよりも早く終了します。それらが利用可能になったらすぐに保存したいと考えています。私は次の方法でこれを行います:

from IPython.parallel import Client

def specialfunc(param):
    import time
    if param > 8:
        raise IOError
    else:
        time.sleep( param)
        return param

client = Client()
balanced       = client.load_balanced_view()
balanced.block = False
param_list = range(10)   # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncmap = balanced.map_async(specialfunc, param_list, ordered=False)

次に、asyncmap をループして、準備ができたら結果を利用できるようにします。

for i in asyncmap:
    print i

問題は、私のコードが例外をスローすることがあることです (上記の例では、呼び出しパラメーターが 8 を超えると IOError が強制的に発生します)、これに対処したいと考えています。ただし、エンジンの 1 つがぐらつきをスローするとすぐに、asyncmap 全体が終了したように見えます。

私が実際に気づいたのは、asyncmap.metadata を問い合わせると、どのメッセージがエラーを出したか (asyncmap.metadata[i]['pyerr']) を非常によく把握できることですが、結果が次のようになるのを待つ方法がわからないということです。彼らはそうします。

したがって、私の質問は、エンジンから非同期に到着した結果を、例外がスローされることがあっても、どのように処理すればよいかということです。コントローラーでの結果の待機を混乱させることなく、エンジンで例外をキャッチするにはどうすればよいですか?

4

2 に答える 2

1

ちょっとばかげているように聞こえるかもしれませんが、エラーを示す特別な値、たとえば-1、またはNoneまたは文字列を返すことができます。map_async私がやったことを回避するには、パラメータをループして を使用しapply_async、結果をリストに保存します。次に、リストをループして結果を取得し、一度に 1 つずつ処理します。次のようになります。

 n_cores = len(c.ids)
 for n,p in enumerate( params ):
     core = c.ids[n%n_cores]
     calls.append( c[core].apply_async( f, p ) )

  #then you get the results

 while calls != []:
      for c in calls:
          try:
               result = c.get(1e-3)
               process(result)
               calls.remove( c )
               #in the case your call failed, you can apply_async again.
               # and append the call to calls.
          except parallel.TimeoutError:
               pass

または、代わりに を使用c[core].apply()して呼び出しを確認しますc.ready()。基本的に例外処理なしで同じこと。面倒なことは、すべての呼び出しのresultsと その他をクリアするのが難しいため、これが多くのメモリを消費することです。dict

私はここで同様のことをしていましたが、map_async がうまくいかないと判断しました。このアプローチを選択する場合、これも関連する可能性があります。

乾杯。

PS: 基本的にこれは上記で実装したものだと思いますが、呼び出しを個別に処理してからマップにスタックする方が自然だと思います。特に、後でそれらの一部を再処理する必要がある場合はそうです。

于 2013-10-23T11:41:55.447 に答える
0

ipython/*/examples/parallel/customresults.pyに触発されて、私はこの解決策を思いつきました:

asyncmap = balanced.map(specialfunc, param_list, ordered=False)

#create original mapping of msg_ids to parameters
# maybe just a quick way to find which parameter gave what result
msg_ids_to_parameters = dict(zip(asyncmap.msg_ids, param_list))

pending = set(asyncmap.msg_ids) # all queued jobs are pending
while pending:   # we'll come back as long as finished jobs haven't been looked at yet
    try:
        client.wait(pending, 1e-3)
    except parallel.TimeoutError:
        # ignore timeouterrors, since they only mean that at least one isn't done
        pass

    # finished is the set of msg_ids that are complete
    finished = pending.difference(client.outstanding)
    # update pending to exclude those that just finished
    pending = pending.difference(finished)
    for msg_id in finished:
        # we know these are done, so don't worry about blocking
        ar = client.get_result(msg_id)
        # checking whether any exceptions occurred when code ran on the engine
        if ar.metadata['pyerr'] is None:
            print "job id %s finished on engine %i " % (msg_id, ar.engine_id)
            print "and results for parameter %i :" % msg_ids_to_parameters[msg_id]
            # note that each job in a map always returns a list of length chunksize
            # even if chunksize == 1
            for res in ar.result:
                print " item %i \n" % res
        else:
            print('this went wrong for %i (%s)' % (msg_ids_to_parameters[msg_id], ar.metadata['pyerr']))

基本的に、コード例からの変更は、メタデータを見て、エラーが記録されているかどうかを確認し、記録されていない場合にのみ、 を介して結果を取得することar.resultでした。

于 2013-10-22T19:51:44.027 に答える