2

私は消費者/生産者優先度キューを実装しました。ここで、優先度は実際にはアイテムがいつ配送されるべきかを表すタイムスタンプです。それはかなりうまく機能しますが、これを実装するためのより良いアイデアがあるかどうか、または現在の実装についてコメントがあるかどうかを知りたいです。

コードは Python です。待機中のコンシューマーを時間通りにウェイクアップするために、単一のスレッドが作成されます。これがライブラリでスレッドを作成するためのアンチパターンであることはわかっていますが、別の方法を考案できませんでした。

コードは次のとおりです。

import collections
import heapq
import threading
import time

class TimelyQueue(threading.Thread):
    """
    Implements a similar but stripped down interface of Queue which
    delivers items on time only.
    """

    class Locker:
        def __init__(self, lock):
            self.l = lock
        def __enter__(self):
            self.l.acquire()
            return self.l
        def __exit__(self, type, value, traceback):
            self.l.release()

    # Optimization to avoid wasting CPU cycles when something
    # is about to happen in less than 5 ms.
    _RESOLUTION = 0.005

    def __init__(self):
        threading.Thread.__init__(self)
        self.daemon = True
        self.queue = []
        self.triggered = collections.deque()
        self.putcond = threading.Condition()
        self.getcond = threading.Condition()
        # Optimization to avoid waking the thread uselessly.
        self.putwaketime = 0

    def put(self, when, item):
        with self.Locker(self.putcond):
            heapq.heappush(self.queue, (when, item))
            if when < self.putwaketime or self.putwaketime == 0:
                self.putcond.notify()

    def get(self, timeout=None):
        with self.Locker(self.getcond):
            if len(self.triggered) > 0:
                when, item = self.triggered.popleft()
                return item
                self.getcond.wait(timeout)
            try:
                when, item = self.triggered.popleft()
            except IndexError:
                return None
            return item

    def qsize(self):
        with self.Locker(self.putcond):
            return len(self.queue)

    def run(self):
        with self.Locker(self.putcond):
            maxwait = None
            while True:
                curtime = time.time()
                try:
                    when, item = self.queue[0]
                    maxwait = when - curtime
                    self.putwaketime = when
                except IndexError:
                    maxwait = None
                    self.putwaketime = 0
                self.putcond.wait(maxwait)

                curtime = time.time()
                while True:
                    # Don't dequeue now, we are not sure to use it yet.
                    try:
                        when, item = self.queue[0]
                    except IndexError:
                        break
                    if when > curtime + self._RESOLUTION:
                        break

                    self.triggered.append(heapq.heappop(self.queue))
                if len(self.triggered) > 0:
                    with self.Locker(self.getcond):
                        self.getcond.notify()


if __name__ == "__main__":
    q = TimelyQueue()
    q.start()

    N = 50000
    t0 = time.time()
    for i in range(N):
        q.put(time.time() + 2, i)
    dt = time.time() - t0
    print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
    t0 = time.time()
    i = 0
    while i < N:
        a = q.get(3)
        if i == 0:
            dt = time.time() - t0
            print "start get after %.3fs" % dt
            t0 = time.time()
        i += 1
    dt = time.time() - t0
    print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)
4

2 に答える 2

0

ここでバックグラウンド スレッドが本当に必要なのは、時間切れになったときにウェイターをキックするタイマーだけですよね?

threading.Timerまず、明示的なバックグラウンド スレッドの代わりにそれを実装できます。しかし、それはより単純かもしれませんが、ユーザーが望むかどうかにかかわらず、ユーザーの背後にスレッドを作成しているという問題を実際に解決することはできません。また、 ではthreading.Timer、タイマーを再起動するたびに実際に新しいスレッドをスピンオフしているため、パフォーマンスの問題になる可能性があります。(一度に 1 つしかありませんが、スレッドの開始と停止は無料ではありません。)

PyPI モジュール、ActiveState レシピ、およびさまざまなフレームワークを見渡すと、1 つのバックグラウンド スレッドで複数のタイマーを実行できる実装が多数あります。それはあなたの問題を解決するでしょう。

しかし、それはまだ完全な解決策ではありません。たとえば、私のアプリに 20TimelyQueue個のオブジェクトが必要だとします。または、TimelyQueueすべてタイマーが必要なその他の 19 個のオブジェクトが必要だとします。私はまだ20のスレッドで終わるでしょう。または、ソケット サーバーまたは GUI アプリを構築しているとします (最も明白な 2 つのユース ケースですTimelyQueue。イベント ループの上にタイマーを実装できます (または、ほとんどの場合、フレームワーク) では、なぜスレッドが必要なのですか?

それを回避する方法は、任意のタイマー ファクトリを提供するためのフックを提供することです。

def __init__(self, timerfactory = threading.Timer):
    self.timerfactory = timerfactory
    ...

ここで、タイマーを調整する必要がある場合:

if when < self.waketime:
    self.timer.cancel()
    self.timer = self.timerfactory(when - now(), self.timercallback)
    self.waketime = when

クイック&ダーティユースケースの場合、これはそのままで十分です。しかし、たとえば を使用している場合は、 をtwisted使用するだけで済み、キューのタイマーはイベント ループTimelyQueue(twisted.reactor.callLater)を通過します。twistedまたは、別の場所で使用しているマルチタイマー 1 スレッドの実装がある場合TimelyQueue(multiTimer.add)、キューのタイマーは他のすべてのタイマーと同じスレッドで動作します。

必要に応じて、よりも優れたデフォルトを提供できますがthreading.Timer、実際には、より優れたものを必要とするほとんどの人は、あなたがthreading.Timer提供するものよりも特定のアプリに適したものを提供できると思います。

もちろん、すべてのタイマーの実装が -- と同じ API を持っているわけthreading.Timerではありません。しかし、使用したいタイマーがTimelyQueueあり、そのインターフェースが間違っている場合、アダプターを作成するのはそれほど難しくありません。たとえば、PyQt4/PySide アプリを構築QTimerしていて、cancelメソッドがなく、秒ではなくミリ秒かかる場合、次のようにする必要があります。

class AdaptedQTimer(object):
    def __init__(self, timeout, callback):
        self.timer = QTimer.singleShot(timeout * 1000, callback)
    def cancel(self):
        self.timer.stop()

q = TimelyQueue(AdaptedQTimer)

QObjectまたは、キューをより直接的に統合したい場合は、まとめQObject.startTimer()timerEvent(self)メソッドにコールバックを呼び出させることができます。

アダプターを検討している場合は、最後のアイデアです。価値はないと思いますが、検討する価値はあります。タイマーが timedelta ではなくタイムスタンプを取り、 ではadjustなく / の代わりにメソッドを持ちcancel、独自の を保持しているwaketime場合、TimelyQueue実装はよりシンプルになり、おそらくより効率的になります。ではput、次のようなものがあります。

if self.timer is None:
    self.timer = self.timerfactory(when)
elif when < self.timer.waketime:
    self.timer.adjust(when)

もちろん、ほとんどのタイマーはこのインターフェースを提供していません。しかし、誰かがそれを持っているか、それを作ろうとするなら、彼らは利益を得ることができます. threading.Timerそして、他のすべての人のために、スタイルのタイマーを必要な種類に変える単純なアダプターを提供できます。次のようなものです。

def timerFactoryAdapter(threadingStyleTimerFactory):
    class TimerFactory(object):
        def __init__(self, timestamp, callback):
            self.timer = threadingStyleTimerFactory(timestamp - now(), callback)
            self.callback = callback
        def cancel(self):
            return self.timer.cancel()
        def adjust(self, timestamp):
            self.timer.cancel()
            self.timer = threadingStyleTimerFactory(timestamp - now(), self.callback)
于 2012-12-05T23:25:59.990 に答える
0

記録のために、タイマー ファクトリを使用して、あなたが提案したものを実装しました。threading.Timer上記のバージョンとクラスを使用した新しいバージョンを使用して小さなベンチマークを実行しました。

  1. 最初の実装

    • デフォルトの解像度 (5 ミリ秒、つまり 5 ミリ秒のウィンドウ内のすべてが同時に起動される) では、約 88k put()/秒と 69k get()/秒を達成します。

    • 分解能を 0 ミリ秒 (最適化なし) に設定すると、約 88k put()/秒および 55k get()/秒を達成します。

  2. 2 回目の実装

    • デフォルトの解像度 (5 ミリ秒) では、約 88k put()/秒および 65k get()/秒を達成します。

    • 分解能を 0 ミリ秒に設定すると、約 88k put()/秒および 62k get()/秒を達成します。

私は、解像度の最適化なしで 2 番目の実装が高速であることに驚いたことを認めます。今さら調べても遅い。

import collections
import heapq
import threading
import time

class TimelyQueue:
    """
    Implements a similar but stripped down interface of Queue which
    delivers items on time only.
    """

    def __init__(self, resolution=5, timerfactory=threading.Timer):
        """
        `resolution' is an optimization to avoid wasting CPU cycles when
        something is about to happen in less than X ms.
        """
        self.resolution = float(resolution) / 1000
        self.timerfactory = timerfactory
        self.queue = []
        self.triggered = collections.deque()
        self.putcond = threading.Condition()
        self.getcond = threading.Condition()
        # Optimization to avoid waking the thread uselessly.
        self.putwaketime = 0
        self.timer = None
        self.terminating = False

    def __arm(self):
        """
        Arm the next timer; putcond must be acquired!
        """
        curtime = time.time()
        when, item = self.queue[0]
        interval = when - curtime
        self.putwaketime = when
        self.timer = self.timerfactory(interval, self.__fire)
        self.timer.start()

    def __fire(self):
        with self.putcond:
            curtime = time.time()
            debug = 0
            while True:
                # Don't dequeue now, we are not sure to use it yet.
                try:
                    when, item = self.queue[0]
                except IndexError:
                    break
                if when > curtime + self.resolution:
                    break

                debug += 1
                self.triggered.append(heapq.heappop(self.queue))
            if len(self.triggered) > 0:
                with self.getcond:
                    self.getcond.notify(len(self.triggered))
            if self.terminating:
                return
            if len(self.queue) > 0:
                self.__arm()

    def put(self, when, item):
        """
        `when' is a Unix time from Epoch.
        """
        with self.putcond:
            heapq.heappush(self.queue, (when, item))
            if when >= self.putwaketime and self.putwaketime != 0:
                return
            # Arm next timer.
            if self.timer is not None:
                self.timer.cancel()
            self.__arm()

    def get(self, timeout=None):
        """
        Timely return the next object on the queue.
        """
        with self.getcond:
            if len(self.triggered) > 0:
                when, item = self.triggered.popleft()
                return item
            self.getcond.wait(timeout)
            try:
                when, item = self.triggered.popleft()
            except IndexError:
                return None
            return item

    def qsize(self):
        """
        Self explanatory.
        """
        with self.putcond:
            return len(self.queue)

    def terminate(self):
        """
        Request the embedded thread to terminate.
        """
        with self.putcond:
            self.terminating = True
            if self.timer is not None:
                self.timer.cancel()
            self.putcond.notifyAll()


if __name__ == "__main__":
    q = TimelyQueue(0)
    N = 100000
    t0 = time.time()
    for i in range(N):
        q.put(time.time() + 2, i)
    dt = time.time() - t0
    print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
    t0 = time.time()
    i = 0
    while i < N:
        a = q.get(3)
        if i == 0:
            dt = time.time() - t0
            print "start get after %.3fs" % dt
            t0 = time.time()
        i += 1
    dt = time.time() - t0
    print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)
    q.terminate()
    # Give change to the thread to exit properly, otherwise we may get
    # a stray interpreter exception.
    time.sleep(0.1)
于 2012-12-13T23:26:37.370 に答える