61

pylabプログラム(おそらくmatlabプログラムでもある可能性があります)では、距離を表す数値のnumpy配列があります:時間d[t]距離ですt(データのタイムスパンはlen(d)時間単位です)。

私が興味を持っているイベントは、距離が特定のしきい値を下回ったときであり、これらのイベントの期間を計算したいと考えています。を使用してブール値の配列を取得するのは簡単ですがb = d<threshold、問題は の True のみの単語の長さのシーケンスを計算することになりますb。しかし、私はそれを効率的に行う方法を知りません (つまり、numpy プリミティブを使用します)。配列をウォークし、手動で変更を検出することに頼りました (つまり、値が False から True になったときにカウンターを初期化し、値が True である限りカウンターを増やします)。値が False に戻ったときにシーケンスにカウンタを出力します)。しかし、これは非常に遅いです。

numpy 配列でそのようなシーケンスを効率的に検出するにはどうすればよいですか?

以下は私の問題を説明するいくつかの python コードです: 4 番目のドットが表示されるまでに非常に長い時間がかかります (そうでない場合は、配列のサイズを増やしてください)。

from pylab import *

threshold = 7

print '.'
d = 10*rand(10000000)

print '.'

b = d<threshold

print '.'

durations=[]
for i in xrange(len(b)):
    if b[i] and (i==0 or not b[i-1]):
        counter=1
    if  i>0 and b[i-1] and b[i]:
        counter+=1
    if (b[i-1] and not b[i]) or i==len(b)-1:
        durations.append(counter)

print '.'
4

6 に答える 6

67

任意の配列に対して完全に numpy ベクトル化された一般的な RLE (文字列、ブール値などでも動作します)。

ランの長さ、開始位置、および値のタプルを出力します。

import numpy as np

def rle(inarray):
        """ run length encoding. Partial credit to R rle function. 
            Multi datatype arrays catered for including non Numpy
            returns: tuple (runlengths, startpositions, values) """
        ia = np.asarray(inarray)                # force numpy
        n = len(ia)
        if n == 0: 
            return (None, None, None)
        else:
            y = ia[1:] != ia[:-1]               # pairwise unequal (string safe)
            i = np.append(np.where(y), n - 1)   # must include last element posi
            z = np.diff(np.append(-1, i))       # run lengths
            p = np.cumsum(np.append(0, z))[:-1] # positions
            return(z, p, ia[i])

かなり速い(i7):

xx = np.random.randint(0, 5, 1000000)
%timeit yy = rle(xx)
100 loops, best of 3: 18.6 ms per loop

複数のデータ型:

rle([True, True, True, False, True, False, False])
Out[8]: 
(array([3, 1, 1, 2]),
 array([0, 3, 4, 5]),
 array([ True, False,  True, False], dtype=bool))

rle(np.array([5, 4, 4, 4, 4, 0, 0]))
Out[9]: (array([1, 4, 2]), array([0, 1, 5]), array([5, 4, 0]))

rle(["hello", "hello", "my", "friend", "okay", "okay", "bye"])
Out[10]: 
(array([2, 1, 1, 2, 1]),
 array([0, 2, 3, 4, 6]),
 array(['hello', 'my', 'friend', 'okay', 'bye'], 
       dtype='|S6'))

上記の Alex Martelli と同じ結果:

xx = np.random.randint(0, 2, 20)

xx
Out[60]: array([1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1])

am = runs_of_ones_array(xx)

tb = rle(xx)

am
Out[63]: array([4, 5, 2, 5])

tb[0][tb[2] == 1]
Out[64]: array([4, 5, 2, 5])

%timeit runs_of_ones_array(xx)
10000 loops, best of 3: 28.5 µs per loop

%timeit rle(xx)
10000 loops, best of 3: 38.2 µs per loop

Alex よりも少し遅く (それでも非常に高速)、はるかに柔軟です。

于 2015-09-20T15:30:43.500 に答える
56

numpyプリミティブではありませんが、itertools関数は非常に高速であることが多いため、これを試してみてください (もちろん、これを含むさまざまなソリューションの時間を測定してください)。

def runs_of_ones(bits):
  for bit, group in itertools.groupby(bits):
    if bit: yield sum(group)

リスト内の値が必要な場合は、もちろん list(runs_of_ones(bits)) を使用できます。しかし、おそらくリスト内包表記の方がわずかに速いかもしれません:

def runs_of_ones_list(bits):
  return [sum(g) for b, g in itertools.groupby(bits) if b]

「numpy-native」の可能性に移行すると、次のようになります。

def runs_of_ones_array(bits):
  # make sure all runs of ones are well-bounded
  bounded = numpy.hstack(([0], bits, [0]))
  # get 1 at run starts and -1 at run ends
  difs = numpy.diff(bounded)
  run_starts, = numpy.where(difs > 0)
  run_ends, = numpy.where(difs < 0)
  return run_ends - run_starts

繰り返しますが、現実的な例でソリューションを相互にベンチマークしてください。

于 2009-07-01T01:04:34.990 に答える
12

配列のみを使用したソリューションを次に示します。ブール値のシーケンスを含む配列を取り、遷移の長さをカウントします。

>>> from numpy import array, arange
>>> b = array([0,0,0,1,1,1,0,0,0,1,1,1,1,0,0], dtype=bool)
>>> sw = (b[:-1] ^ b[1:]); print sw
[False False  True False False  True False False  True False False False
  True False]
>>> isw = arange(len(sw))[sw]; print isw
[ 2  5  8 12]
>>> lens = isw[1::2] - isw[::2]; print lens
[3 4]

swスイッチがある場所に true を含み、iswそれらをインデックスに変換します。isw の項目は、 でペアごとに減算されlensます。

シーケンスが 1 で始まる場合、0 シーケンスの長さをカウントすることに注意してください。これは、lens を計算するインデックスで修正できます。また、長さ 1 のシーケンスなどのコーナー ケースはテストしていません。


すべてのサブ配列の開始位置と長さを返す完全な関数True

import numpy as np

def count_adjacent_true(arr):
    assert len(arr.shape) == 1
    assert arr.dtype == np.bool
    if arr.size == 0:
        return np.empty(0, dtype=int), np.empty(0, dtype=int)
    sw = np.insert(arr[1:] ^ arr[:-1], [0, arr.shape[0]-1], values=True)
    swi = np.arange(sw.shape[0])[sw]
    offset = 0 if arr[0] else 1
    lengths = swi[offset+1::2] - swi[offset:-1:2]
    return swi[offset:-1:2], lengths

さまざまな bool 1D 配列 (空の配列、単一/複数の要素、偶数/奇数の長さ、 /で始まる、/ 要素のみ) についてテストされてTrueいますFalseTrueFalse

于 2009-07-01T10:32:35.690 に答える
6

誰かが興味を持っている場合に備えて(そして、MATLABについて言及したので)、MATLABでそれを解決する1つの方法を次に示します。

threshold = 7;
d = 10*rand(1,100000);  % Sample data
b = diff([false (d < threshold) false]);
durations = find(b == -1)-find(b == 1);

私は Python にあまり詳しくありませんが、これがヒントになるかもしれません。=)

于 2009-07-01T01:15:16.030 に答える
2

パーティーには遅れているかもしれませんが、Numba ベースのアプローチは断然最速です。

import numpy as np
import numba as nb


@nb.njit
def count_steps_nb(arr):
    result = 1
    last_x = arr[0]
    for x in arr[1:]:
        if last_x != x:
            result += 1
            last_x = x
    return result


@nb.njit
def rle_nb(arr):
    n = count_steps_nb(arr)
    values = np.empty(n, dtype=arr.dtype)
    lengths = np.empty(n, dtype=np.int_)
    last_x = arr[0]
    length = 1
    i = 0
    for x in arr[1:]:
        if last_x != x:
            values[i] = last_x
            lengths[i] = length
            i += 1
            length = 1
            last_x = x
        else:
            length += 1
    values[i] = last_x
    lengths[i] = length
    return values, lengths

Numpy ベースのアプローチ ( @ThomasBrowne の回答に触発されましたが、高価なものの使用がnumpy.concatenate()最小限に抑えられているため高速です) が次点です (ここでは 2 つの同様のアプローチが提示されています。もう1つは違いを使用しています):

import numpy as np


def rle_np_neq(arr):
    n = len(arr)
    if n == 0:
        values = np.empty(0, dtype=arr.dtype)
        lengths = np.empty(0, dtype=np.int_)
    else:
        positions = np.concatenate(
            [[-1], np.nonzero(arr[1:] != arr[:-1])[0], [n - 1]])
        lengths = positions[1:] - positions[:-1]
        values = arr[positions[1:]]
    return values, lengths
import numpy as np


def rle_np_diff(arr):
    n = len(arr)
    if n == 0:
        values = np.empty(0, dtype=arr.dtype)
        lengths = np.empty(0, dtype=np.int_)
    else:
        positions = np.concatenate(
            [[-1], np.nonzero(arr[1:] - arr[:-1])[0], [n - 1]])
        lengths = positions[1:] - positions[:-1]
        values = arr[positions[1:]]
        return values, lengths

これらは両方とも、素朴で単純な単一ループのアプローチよりも優れています。

import numpy as np


def rle_loop(arr):
    values = []
    lengths = []
    last_x = arr[0]
    length = 1
    for x in arr[1:]:
        if last_x != x:
            values.append(last_x)
            lengths.append(length)
            length = 1
            last_x = x
        else:
            length += 1
    values.append(last_x)
    lengths.append(length)
    return np.array(values), np.array(lengths)

それどころか、一般にグループサイズ情報を抽出する簡単な方法がないため、使用itertools.groupby()は単純なループよりも高速になることはありません( @AlexMartelliの回答のような非常に特殊なケースまたは誰かがグループオブジェクトに実装する場合を除く)。__len__グループ自体をループする以外は、正確には高速ではありません。

import itertools
import numpy as np


def rle_groupby(arr):
    values = []
    lengths = []
    for x, group in itertools.groupby(arr):
        values.append(x)
        lengths.append(sum(1 for _ in group))
    return np.array(values), np.array(lengths)

さまざまなサイズのランダムな整数配列に対するいくつかのベンチマークの結果が報告されています。

bm_full bm_ズーム

(完全な分析はこちら)。

于 2021-10-24T01:26:11.107 に答える
0
durations = []
counter   = 0

for bool in b:
    if bool:
        counter += 1
    elif counter > 0:
        durations.append(counter)
        counter = 0

if counter > 0:
    durations.append(counter)
于 2009-07-01T00:31:02.297 に答える