1

sig2zダスク配列に適用したいという関数があります:

def sig2z(da, zr, zi, nvar=None, dim=None, coord=None):
    """
    Interpolate variables on \sigma coordinates onto z coordinates.

    Parameters
    ----------
    da : `dask.array`
        The data on sigma coordinates to be interpolated
    zr : `dask.array`
        The depths corresponding to sigma layers
    zi : `numpy.array`
        The depths which to interpolate the data on
    nvar : str (optional)
        Name of the variable. Only necessary when the variable is
        horizontal velocity.

    Returns
    -------
    dai : `dask.array`
        The data interpolated onto a spatial uniform z coordinate
    """

    if np.diff(zi)[0] < 0. or zi.max() <= 0.:
        raise ValueError("The values in `zi` should be postive and increasing.")
    if np.any(np.absolute(zr[0]) < np.absolute(zr[-1])):
        raise ValueError("`zr` should have the deepest depth at index 0.")
    if zr.shape != da.shape[-3:]:
        raise ValueError("`zr` should have the same "
                        "spatial dimensions as `da`.")

    if dim == None:
        dim = da.dims
    if coord == None:
        coord = da.coords
    N = da.shape
    nzi = len(zi)
    if len(N) == 4:
        dai = np.empty((N[0],nzi,N[-2],N[-1]))
    elif len(N) == 3:
        dai = np.empty((nzi,N[-2],N[-1]))
    else:
        raise ValueError("The data should at least have three dimensions")
    dai[:] = np.nan

    zi = -zi[::-1] # ROMS has deepest level at index=0

    if nvar=='u':  # u variables
        zl = .5*(zr.shift(eta_rho=-1, xi_rho=-1)
                 + zr.shift(eta_rho=-1)
                )
    elif nvar=='v': # v variables
        zl = .5*(zr.shift(xi_rho=-1)
                 + zr.shift(eta_rho=-1, xi_rho=-1)
                )
    else:
        zl = zr

    for i in range(N[-1]):
        for j in range(N[-2]):
            # only bother for sufficiently deep regions
            if zl[:,j,i].min() < -1e2:
                # only interp on z above topo
                ind = np.argwhere(zi >= zl[:,j,i].copy().min())
                if len(N) == 4:
                    for s in range(N[0]):
                        dai[s,:len(ind),j,i] = _interpolate(da[s,:,j,i].copy(),
                                                            zl[:,j,i].copy(),
                                                            zi[int(ind[0]):]
                                                           )
                else:
                    dai[:len(ind),j,i] = _interpolate(da[:,j,i].copy(),
                                                      zl[:,j,i].copy(),
                                                      zi[int(ind[0]):]
                                                     )

    return xr.DataArray(dai, dims=dim, coords=coord)

これは xarray.DataArray では正常に機能しますが、dask.array に適用すると、次のエラーが発生します。

test = dsar.map_blocks(sig2z, w[0].data, 
                      zr.chunk({'eta_rho':1,'xi_rho':1}).data, zi, 
                      dim, coord,
                      chunks=dai[0].chunks, dtype=dai.dtype
                      ).compute()

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-29-d81bad2f4486> in <module>()
----> 1 test.compute()

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
     95             Extra keywords to forward to the scheduler ``get`` function.
     96         """
---> 97         (result,) = compute(self, traverse=False, **kwargs)
     98         return result
     99 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    202     dsk = collections_to_dsk(variables, optimize_graph, **kwargs)
    203     keys = [var._keys() for var in variables]
--> 204     results = get(dsk, keys, **kwargs)
    205 
    206     results_iter = iter(results)

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
     73     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     74                         cache=cache, get_id=_thread_get_id,
---> 75                         pack_exception=pack_exception, **kwargs)
     76 
     77     # Cleanup pools associated to dead threads

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    519                         _execute_task(task, data)  # Re-execute locally
    520                     else:
--> 521                         raise_exception(exc, tb)
    522                 res, worker_id = loads(res_info)
    523                 state['cache'][key] = res

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
     58         if exc.__traceback__ is not tb:
     59             raise exc.with_traceback(tb)
---> 60         raise exc
     61 
     62 else:

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    288     try:
    289         task, data = loads(task_info)
--> 290         result = _execute_task(task, data)
    291         id = get_id()
    292         result = dumps((result, id))

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk)
    269         func, args = arg[0], arg[1:]
    270         args2 = [_execute_task(a, cache) for a in args]
--> 271         return func(*args2)
    272     elif not ishashable(arg):
    273         return arg

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/array/core.py in getarray(a, b, lock)
     63         c = a[b]
     64         if type(c) != np.ndarray:
---> 65             c = np.asarray(c)
     66     finally:
     67         if lock:

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order)
    529 
    530     """
--> 531     return array(a, dtype, copy=False, order=order)
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    425 
    426     def __array__(self, dtype=None):
--> 427         self._ensure_cached()
    428         return np.asarray(self.array, dtype=dtype)
    429 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in _ensure_cached(self)
    422     def _ensure_cached(self):
    423         if not isinstance(self.array, np.ndarray):
--> 424             self.array = np.asarray(self.array)
    425 
    426     def __array__(self, dtype=None):

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order)
    529 
    530     """
--> 531     return array(a, dtype, copy=False, order=order)
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    406 
    407     def __array__(self, dtype=None):
--> 408         return np.asarray(self.array, dtype=dtype)
    409 
    410     def __getitem__(self, key):

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order)
    529 
    530     """
--> 531     return array(a, dtype, copy=False, order=order)
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    373     def __array__(self, dtype=None):
    374         array = orthogonally_indexable(self.array)
--> 375         return np.asarray(array[self.key], dtype=None)
    376 
    377     def __getitem__(self, key):

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order)
    529 
    530     """
--> 531     return array(a, dtype, copy=False, order=order)
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    373     def __array__(self, dtype=None):
    374         array = orthogonally_indexable(self.array)
--> 375         return np.asarray(array[self.key], dtype=None)
    376 
    377     def __getitem__(self, key):

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/backends/netCDF4_.py in __getitem__(self, key)
     58         with self.datastore.ensure_open(autoclose=True):
     59             try:
---> 60                 data = getitem(self.get_array(), key)
     61             except IndexError:
     62                 # Catch IndexError in netCDF4 and return a more informative

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:39743)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._get (netCDF4/_netCDF4.c:49835)()

RuntimeError: Resource temporarily unavailable

このエラーが発生する理由を教えてください。前もって感謝します。

4

1 に答える 1