私は分散型の dask クラスターをセットアップしており、それを使用して一連のデータを読み込んで変換しました。魅力のように機能します。
私はそれを使用していくつかの処理を並行して行いたいと思っています。これが私の機能です
el = 5000
n_using = 26
n_across= 6
mat = np.random.random((el,n_using,n_across))
idx = np.tril_indices(n_across*2, -n_across)
def get_vals(c1, m, el, idx):
m1 = m[c1,:,:]
corr_vals = np.zeros((el, (n_across//2)*(n_across+1)))
for c2 in range(c1+1, el):
corr = np.corrcoef(m1.T, m[c2,:,:].T)
corr_vals[c2] = corr[idx]
return corr_vals
lazy_get_val = dask.delayed(get_vals, pure=True)
これは、私がやろうとしていることのシングル プロセッサ バージョンです。
arrays = [get_vals(c1, mat, el, idx) for c1 in range(el)]
all_corr = np.stack(arrays, axis=0)
正常に動作しますが、数時間かかります。これがダスクでこれを行うための私の行き方です:
lazy_list = [lazy_get_val(c1, mat, el, idx) for c1 in range(el)]
arrays = [da.from_delayed(lazy_item, dtype=float, shape=(el, 21)) for lazy_item in lazy_list]
all_corr = da.stack(arrays, axis=0)
実行all_corr[1].compute()
しても、そこに座っているだけで応答しません。カーネルを中断すると、/distributed/utils.py でスタックしているようです:
~/.../lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
249 else: 250 while not e.is_set(): --> 251 e.wait(10) 252 if error[0]: 253 six.reraise(*error[0])
これをデバッグするための提案はありますか?
他のもの:
- 小さい
mat
(el=1000) で実行すると、正常に実行されます。 - 私が作る
el = 5000
と、ハングします。 - カーネルを中断して再度実行すると
el = 1000
、ハングします。