11

リストがあるとしましょう:

y = ['1', '2', '3', '4','5','6','7','8','9','10']

n 日間の移動平均を計算する関数を作成したいと考えています。したがって、n5 の場合、コードで最初の 1 ~ 5 を計算し、それを加算して平均を求めます。これは 3.0 になります。次に 2 ~ 6 に進み、平均を計算します。これは 4.0、次に 3- になります。 7、4-8、5-9、6-10。

最初の n-1 日を計算したくないので、n 日目から前の日をカウントします。

def moving_average(x:'list of prices', n):
    for num in range(len(x)+1):
        print(x[num-n:num])

これは私が欲しいものを印刷するようです:

[]
[]
[]
[]
[]

['1', '2', '3', '4', '5']

['2', '3', '4', '5', '6']

['3', '4', '5', '6', '7']

['4', '5', '6', '7', '8']

['5', '6', '7', '8', '9']

['6', '7', '8', '9', '10']

ただし、それらのリスト内の数値を計算する方法がわかりません。何か案は?

4

5 に答える 5

23

Python ドキュメントの古いバージョンに、itertoolsを含む優れたスライディング ウィンドウ ジェネレーターがあります。

from itertools import islice

def window(seq, n=2):
    "Returns a sliding window (of width n) over data from the iterable"
    "   s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...                   "
    it = iter(seq)
    result = tuple(islice(it, n))
    if len(result) == n:
        yield result    
    for elem in it:
        result = result[1:] + (elem,)
        yield result

それを使用すると、移動平均は簡単です。

from __future__ import division  # For Python 2

def moving_averages(values, size):
    for selection in window(values, size):
        yield sum(selection) / size

これを入力に対して実行すると (文字列を整数にマッピング)、次のようになります。

>>> y= ['1', '2', '3', '4','5','6','7','8','9','10']
>>> for avg in moving_averages(map(int, y), 5):
...     print(avg)
... 
3.0
4.0
5.0
6.0
7.0
8.0

「不完全な」セットのNone最初の反復を返すには、関数を少し拡張します。n - 1moving_averages

def moving_averages(values, size):
    for _ in range(size - 1):
        yield None
    for selection in window(values, size):
        yield sum(selection) / size
于 2013-02-14T21:07:58.073 に答える
7

ジョージのように、これに関するMartijnの答えが好きですがsum()、ほとんど同じ数字に何度も何度も適用する代わりに、実行中の合計を使用することでこれが速くならないのではないかと思っていました。

Noneまた、ランプアップ フェーズ中に値をデフォルトとして設定するというアイデアも興味深いものです。実際、移動平均について考えられるさまざまなシナリオがたくさんあるかもしれません。平均の計算を 3 つのフェーズに分けてみましょう。

  1. ランプアップ: 現在の反復回数 < ウィンドウ サイズで反復を開始する
  2. 着実な進歩: 法線を計算するために使用できる正確なウィンドウ サイズの数の要素があります。average := sum(x[iteration_counter-window_size:iteration_counter])/window_size
  3. Ramp Down: 入力データの最後に、別のwindow_size - 1「平均」数値を返すことができます。

ここに受け入れる関数があります

  • データの入力としての任意のイテラブル (ジェネレーターは問題ありません)
  • 任意のウィンドウ サイズ >= 1
  • Ramp Up/Down のフェーズ中に値の生成をオン/オフするパラメータ
  • 値の生成方法を制御するこれらのフェーズのコールバック関数。これを使用して、デフォルト (例: ) を常に提供しNoneたり、部分的な平均を提供したりできます。

コードは次のとおりです。

from collections import deque 

def moving_averages(data, size, rampUp=True, rampDown=True):
    """Slide a window of <size> elements over <data> to calc an average

    First and last <size-1> iterations when window is not yet completely
    filled with data, or the window empties due to exhausted <data>, the
    average is computed with just the available data (but still divided
    by <size>).
    Set rampUp/rampDown to False in order to not provide any values during
    those start and end <size-1> iterations.
    Set rampUp/rampDown to functions to provide arbitrary partial average
    numbers during those phases. The callback will get the currently
    available input data in a deque. Do not modify that data.
    """
    d = deque()
    running_sum = 0.0

    data = iter(data)
    # rampUp
    for count in range(1, size):
        try:
            val = next(data)
        except StopIteration:
            break
        running_sum += val
        d.append(val)
        #print("up: running sum:" + str(running_sum) + "  count: " + str(count) + "  deque: " + str(d))
        if rampUp:
            if callable(rampUp):
                yield rampUp(d)
            else:
                yield running_sum / size

    # steady
    exhausted_early = True
    for val in data:
        exhausted_early = False
        running_sum += val
        #print("st: running sum:" + str(running_sum) + "  deque: " + str(d))
        yield running_sum / size
        d.append(val)
        running_sum -= d.popleft()

    # rampDown
    if rampDown:
        if exhausted_early:
            running_sum -= d.popleft()
        for (count) in range(min(len(d), size-1), 0, -1):
            #print("dn: running sum:" + str(running_sum) + "  deque: " + str(d))
            if callable(rampDown):
                yield rampDown(d)
            else:
                yield running_sum / size
            running_sum -= d.popleft()

Martijn のバージョンよりも少し速いようですが、はるかにエレガントです。テストコードは次のとおりです。

print("")
print("Timeit")
print("-" * 80)

from itertools import islice
def window(seq, n=2):
    "Returns a sliding window (of width n) over data from the iterable"
    "   s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...                   "
    it = iter(seq)
    result = tuple(islice(it, n))
    if len(result) == n:
        yield result    
    for elem in it:
        result = result[1:] + (elem,)
        yield result

# Martijn's version:
def moving_averages_SO(values, size):
    for selection in window(values, size):
        yield sum(selection) / size


import timeit
problems = [int(i) for i in (10, 100, 1000, 10000, 1e5, 1e6, 1e7)]
for problem_size in problems:
    print("{:12s}".format(str(problem_size)), end="")

    so = timeit.repeat("list(moving_averages_SO(range("+str(problem_size)+"), 5))", number=1*max(problems)//problem_size,
                       setup="from __main__ import moving_averages_SO")
    print("{:12.3f} ".format(min(so)), end="")

    my = timeit.repeat("list(moving_averages(range("+str(problem_size)+"), 5, False, False))", number=1*max(problems)//problem_size,
                       setup="from __main__ import moving_averages")
    print("{:12.3f} ".format(min(my)), end="")

    print("")

そして出力:

Timeit
--------------------------------------------------------------------------------
10                 7.242        7.656 
100                5.816        5.500 
1000               5.787        5.244 
10000              5.782        5.180 
100000             5.746        5.137 
1000000            5.745        5.198 
10000000           5.764        5.186 

元の質問は、次の関数呼び出しで解決できるようになりました。

print(list(moving_averages(range(1,11), 5,
                           rampUp=lambda _: None,
                           rampDown=False)))

出力:

[None, None, None, None, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
于 2013-02-18T18:15:14.937 に答える
2

中間合計の再計算を回避するアプローチ。

list=range(0,12)
def runs(v):
 global runningsum
 runningsum+=v
 return(runningsum)
runningsum=0
runsumlist=[ runs(v) for v in list ]
result = [ (runsumlist[k] - runsumlist[k-5])/5 for k in range(0,len(list)+1)]

印刷結果

[2,3,4,5,6,7,8,9]

数字と文字列を持ち歩きたくない場合は、runs(int(v)) .. then .. repr( runsumlist[k] - runsumlist[k-5])/5 ) を作成します。


グローバルなしの Alt:

list = [float[x] for x in range(0,12)]
nave = 5
movingave = sum(list[:nave]/nave)
for i in range(len(list)-nave):movingave.append(movingave[-1]+(list[i+nave]-list[i])/nave)
print movingave 

入力値が整数であっても浮動小数点演算を行うようにしてください

[2.0,3.0,4.0,5.0,6.0,7.0,8.0,9,0]
于 2013-02-14T22:04:03.567 に答える
1

sumおよび関数を使用しmapます。

print(sum(map(int, x[num-n:num])))

mapPython 3の関数は、基本的にこれの遅延バージョンです。

[int(i) for i in x[num-n:num]]

sum関数が何をするかは、きっとお分かりいただけると思います。

于 2013-02-14T21:07:40.763 に答える
0

itertoolsレシピを拡張する別のソリューションがありpairwise()ます。これを に拡張するnwise()と、スライディング ウィンドウが得られます (反復可能オブジェクトがジェネレータの場合に機能します)。

def nwise(iterable, n):
    ts = it.tee(iterable, n)
    for c, t in enumerate(ts):
        next(it.islice(t, c, c), None)
    return zip(*ts)

def moving_averages_nw(iterable, n):
    yield from (sum(x)/n for x in nwise(iterable, n))

>>> list(moving_averages_nw(range(1, 11), 5))
[3.0, 4.0, 5.0, 6.0, 7.0, 8.0]

short のセットアップ コストは比較的高くなりますが、iterableこのコストはデータ セットが長くなるほど影響が少なくなります。これは使用しますsum()が、コードはかなりエレガントです:

Timeit              MP           cfi         *****
--------------------------------------------------------------------------------
10                 4.658        4.959        7.351 
100                5.144        4.070        4.234 
1000               5.312        4.020        3.977 
10000              5.317        4.031        3.966 
100000             5.508        4.115        4.087 
1000000            5.526        4.263        4.202 
10000000           5.632        4.326        4.242 
于 2016-11-26T14:59:58.190 に答える