4

統計アキュムレータを使用すると、増分計算を実行できます。たとえば、任意の時間に与えられた数値のストリームの算術平均を計算するために、与えられたアイテムの現在の数nとそれらの合計を追跡するオブジェクトを作成できますsum。平均を要求すると、オブジェクトは単純に を返しますsum/n

このようなアキュムレータを使用すると、新しい数値が与えられたときに、合計とカウント全体を再計算する必要がないという意味で、インクリメンタルに計算できます。

同様のアキュムレータを他の統計用に作成できます ( C++ 実装用のブースト ライブラリを参照)。

Python でアキュムレータをどのように実装しますか? 私が思いついたコードは次のとおりです。

class Accumulator(object):
    """
    Used to accumulate the arithmetic mean of a stream of
    numbers. This implementation does not allow to remove items
    already accumulated, but it could easily be modified to do
    so. also, other statistics could be accumulated.
    """
    def __init__(self):
     # upon initialization, the numnber of items currently
     # accumulated (_n) and the total sum of the items acumulated
     # (_sum) are set to zero because nothing has been accumulated
     # yet.
     self._n = 0
     self._sum = 0.0

    def add(self, item):
     # the 'add' is used to add an item to this accumulator
     try:
        # try to convert the item to a float. If you are
        # successful, add the float to the current sum and
        # increase the number of accumulated items
        self._sum += float(item)
        self._n += 1
     except ValueError:
        # if you fail to convert the item to a float, simply
        # ignore the exception (pass on it and do nothing)
        pass

    @property
    def mean(self):
     # the property 'mean' returns the current mean accumulated in
     # the object
     if self._n > 0:
        # if you have more than zero items accumulated, then return
        # their artithmetic average
        return self._sum / self._n
     else:
        # if you have no items accumulated, return None (you could
        # also raise an exception)
        return None

# using the object:

# Create an instance of the object "Accumulator"
my_accumulator = Accumulator()
print my_accumulator.mean
# prints None because there are no items accumulated

# add one (a number)
my_accumulator.add(1)
print my_accumulator.mean
# prints 1.0

# add two (a string - it will be converted to a float)
my_accumulator.add('2')
print my_accumulator.mean
# prints 1.5

# add a 'NA' (will be ignored because it cannot be converted to float)
my_accumulator.add('NA')
print my_accumulator.mean
# prints 1.5 (notice that it ignored the 'NA')

興味深い設計上の問題が発生します。

  1. アキュムレータをスレッドセーフにする方法は?
  2. アイテムを安全に削除するには?
  3. 他の統計を簡単にプラグインできるように設計する方法 (統計のファクトリ)
4

2 に答える 2

3

一般化されたスレッドセーフな高レベル関数の場合、次のようなものをQueue.Queueクラスと他のビットと組み合わせて使用​​できます。

from Queue import Empty

def Accumulator(f, q, storage):
    """Yields successive values of `f` over the accumulation of `q`.

    `f` should take a single iterable as its parameter.

    `q` is a Queue.Queue or derivative.

    `storage` is a persistent sequence that provides an `append` method.
    `collections.deque` may be particularly useful, but a `list` is quite acceptable.

    >>> from Queue import Queue
    >>> from collections import deque
    >>> from threading import Thread
    >>> def mean(it):
    ...     vals = tuple(it)
    ...     return sum(it) / len(it)
    >>> value_queue = Queue()
    >>> LastThreeAverage = Accumulator(mean, value_queue, deque((), 3))
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(LastThreeAverage)
    [0, 1, 2, 4, 6, 8]
    """
    try:
        while True:
            storage.append(q.get(timeout=0.1))
            q.task_done()
            yield f(storage)
    except Empty:
        pass

このジェネレーター関数は、他のエンティティに委譲することで、その責任のほとんどを回避します。

  • Queue.Queueスレッドセーフな方法でソース要素を提供することに依存しています
  • パラメータのcollections.deque値としてオブジェクトを渡すことができます。storageこれにより、とりわけ、最後のn(この場合は 3 つの) 値のみを使用する便利な方法が提供されます。
  • 関数自体 (この場合meanは ) がパラメーターとして渡されます。これは場合によっては最適とは言えない効率的なコードになりますが、あらゆる種類の状況に容易に適用できます。

プロデューサ スレッドが値ごとに 0.1 秒以上かかる場合、アキュムレータがタイムアウトする可能性があることに注意してください。これは、より長いタイムアウトを渡すか、タイムアウト パラメータを完全に削除することで簡単に解決できます。後者の場合、関数はキューの最後で無期限にブロックされます。この使用法は、サブスレッド (通常はスレッド) で使用されている場合に、より意味がありdaemonます。もちろんq.get、 の 4 番目の引数として に渡される引数をパラメータ化することもできますAccumulator

キューの終わりを通信したい場合、つまり、これ以上値が来ないことをプロデューサー スレッド (ここputting_thread) から伝えたい場合は、センチネル値を渡してチェックするか、他の方法を使用できます。このスレッドに詳細があります。メソッドを提供するCloseableQueue というQueue.Queueのサブクラスを作成することにしましたclose

このような関数の動作をカスタマイズするには、他にもさまざまな方法があります。たとえば、キュ​​ーのサイズを制限するなどです。これは使用例です。

編集

上記のように、これは再計算が必要なため効率が低下し、また、あなたの質問に実際には答えていないと思います。

ジェネレーター関数は、そのsendメソッドを介して値を受け入れることもできます。したがって、次のような平均生成関数を書くことができます

def meangen():
    """Yields the accumulated mean of sent values.

    >>> g = meangen()
    >>> g.send(None) # Initialize the generator
    >>> g.send(4)
    4.0
    >>> g.send(10)
    7.0
    >>> g.send(-2)
    4.0
    """
    sum = yield(None)
    count = 1
    while True:
        sum += yield(sum / float(count))
        count += 1

ここで、yield 式は値 —sendへの引数 — を関数に取り込み、同時に計算された値を の戻り値として渡しますsend

その関数の呼び出しによって返されたジェネレーターを、次のようなより最適化可能なアキュムレータ ジェネレーター関数に渡すことができます。

def EfficientAccumulator(g, q):
    """Similar to Accumulator but sends values to a generator `g`.

    >>> from Queue import Queue
    >>> from threading import Thread
    >>> value_queue = Queue()
    >>> g = meangen()
    >>> g.send(None)
    >>> mean_accumulator = EfficientAccumulator(g, value_queue)
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(mean_accumulator)
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
    """
    try:
        while True:
            yield(g.send(q.get(timeout=0.1)))
            q.task_done()
    except Empty:
        pass
于 2010-09-23T05:04:36.347 に答える
1

これを Python で行う場合、次の 2 つの点が異なります。

  1. 各アキュムレータの機能を分離します。
  2. @property を使用しないでください。

最初のものについては、おそらく次のような、累積を実行するための API を考え出す必要があります。

def add(self, num) # add a number
def compute(self) # compute the value of the accumulator

次に、これらのアキュムレータを保持する AccumulatorRegistry を作成し、ユーザーがアクションを呼び出してそれらすべてに追加できるようにします。コードは次のようになります。

class Accumulators(object):
    _accumulator_library = {}

    def __init__(self):
        self.accumulator_library = {}
        for key, value in Accumulators._accumulator_library.items():
            self.accumulator_library[key] = value()

    @staticmethod
    def register(name, accumulator):
        Accumulators._accumulator_library[name] = accumulator

    def add(self, num):
        for accumulator in self.accumulator_library.values():
            accumulator.add(num)

    def compute(self, name):
        self.accumulator_library[name].compute()

    @staticmethod
    def register_decorator(name):
        def _inner(cls):
            Accumulators.register(name, cls)
            return cls


@Accumulators.register_decorator("Mean")
class Mean(object):
    def __init__(self):
        self.total = 0
        self.count = 0

    def add(self, num):
        self.count += 1
        self.total += num

    def compute(self):
        return self.total / float(self.count)

おそらく、スレッドセーフな質問について話す必要があります。Python の GIL は、多くのスレッド化の問題からユーザーを保護します。ただし、身を守るためにできることがいくつかあります。

  • これらのオブジェクトが 1 つのスレッドにローカライズされている場合は、threading.local を使用します。
  • そうでない場合は、 with context 構文を使用して操作をロックでラップし、ロックの保持を処理できます。
于 2010-09-22T23:35:15.777 に答える