ipythonの並列処理を使ってデータを並列処理しようとしています。ipython並列処理で中間結果を取得する方法に関する質問への回答として、@minrkの指示に従っていますか? . データが異種であるため、一部の処理タスクは他のタスクよりも早く終了します。それらが利用可能になったらすぐに保存したいと考えています。私は次の方法でこれを行います:
from IPython.parallel import Client
def specialfunc(param):
import time
if param > 8:
raise IOError
else:
time.sleep( param)
return param
client = Client()
balanced = client.load_balanced_view()
balanced.block = False
param_list = range(10) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncmap = balanced.map_async(specialfunc, param_list, ordered=False)
次に、asyncmap をループして、準備ができたら結果を利用できるようにします。
for i in asyncmap:
print i
問題は、私のコードが例外をスローすることがあることです (上記の例では、呼び出しパラメーターが 8 を超えると IOError が強制的に発生します)、これに対処したいと考えています。ただし、エンジンの 1 つがぐらつきをスローするとすぐに、asyncmap 全体が終了したように見えます。
私が実際に気づいたのは、asyncmap.metadata を問い合わせると、どのメッセージがエラーを出したか (asyncmap.metadata[i]['pyerr']) を非常によく把握できることですが、結果が次のようになるのを待つ方法がわからないということです。彼らはそうします。
したがって、私の質問は、エンジンから非同期に到着した結果を、例外がスローされることがあっても、どのように処理すればよいかということです。コントローラーでの結果の待機を混乱させることなく、エンジンで例外をキャッチするにはどうすればよいですか?