私は DASK を初めて使用し、クラスターでの DASK の実行をテストしたいと考えています。クラスターには、ヘッド サーバーと他のいくつかのノードがあります。ヘッドサーバーにログインすると、パスワードなしの単純な ssh で他のノードに入ることができます。大きな配列を反復する単純な関数を実行したいと思います。関数は以下で定義されます。dt64 を numpy の datetime オブジェクトに変換することです。
import xarray as xr
import numpy as np
from dask import compute, delayed
import dask.multiprocessing
from datetime import datetime, timedelta
def converdt64(dt64):
ts = (dt64 - np.datetime64('1970-01-01T00:00:00Z')) / np.timedelta64(1, 's')
return datetime.utcfromtimestamp(ts)
次に、ターミナルで、この関数を適用してサイズ N の配列 1D を反復処理します。
values = [delayed(convertdt64)(x) for x in arraydata]
results1 = compute(*values,scheduler='processes’)
これは、ヘッド サーバーでいくつかのコアを使用し、ゆっくりと動作します。次に、以下のようにクライアントを使用して、クラスターのいくつかのノードで関数を起動しようとしました。
from dask.distributed import Client
client = Client("10.140.251.254:8786 »)
results = compute(*values, scheduler='distributed’)
まったく機能しません。次のようないくつかの警告と 1 つのエラー メッセージがあります。
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://10.140.251.254:57257 remote=tcp://10.140.251.254:8786>
CancelledError: convertdt64-0205ad5e-214b-4683-b5c4-b6a2a6d8e52f
dask.bag も試してみましたが、同じエラー メッセージが表示されました。クラスターでの並列計算が機能しない理由は何ですか? サーバー/ネットワーク構成のせいですか、それとも DASK クライアントの間違った使い方ですか? よろしくお願いいたします。
幸運をお祈りしています
シャノンX