私は消費者/生産者優先度キューを実装しました。ここで、優先度は実際にはアイテムがいつ配送されるべきかを表すタイムスタンプです。それはかなりうまく機能しますが、これを実装するためのより良いアイデアがあるかどうか、または現在の実装についてコメントがあるかどうかを知りたいです。
コードは Python です。待機中のコンシューマーを時間通りにウェイクアップするために、単一のスレッドが作成されます。これがライブラリでスレッドを作成するためのアンチパターンであることはわかっていますが、別の方法を考案できませんでした。
コードは次のとおりです。
import collections
import heapq
import threading
import time
class TimelyQueue(threading.Thread):
"""
Implements a similar but stripped down interface of Queue which
delivers items on time only.
"""
class Locker:
def __init__(self, lock):
self.l = lock
def __enter__(self):
self.l.acquire()
return self.l
def __exit__(self, type, value, traceback):
self.l.release()
# Optimization to avoid wasting CPU cycles when something
# is about to happen in less than 5 ms.
_RESOLUTION = 0.005
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self.queue = []
self.triggered = collections.deque()
self.putcond = threading.Condition()
self.getcond = threading.Condition()
# Optimization to avoid waking the thread uselessly.
self.putwaketime = 0
def put(self, when, item):
with self.Locker(self.putcond):
heapq.heappush(self.queue, (when, item))
if when < self.putwaketime or self.putwaketime == 0:
self.putcond.notify()
def get(self, timeout=None):
with self.Locker(self.getcond):
if len(self.triggered) > 0:
when, item = self.triggered.popleft()
return item
self.getcond.wait(timeout)
try:
when, item = self.triggered.popleft()
except IndexError:
return None
return item
def qsize(self):
with self.Locker(self.putcond):
return len(self.queue)
def run(self):
with self.Locker(self.putcond):
maxwait = None
while True:
curtime = time.time()
try:
when, item = self.queue[0]
maxwait = when - curtime
self.putwaketime = when
except IndexError:
maxwait = None
self.putwaketime = 0
self.putcond.wait(maxwait)
curtime = time.time()
while True:
# Don't dequeue now, we are not sure to use it yet.
try:
when, item = self.queue[0]
except IndexError:
break
if when > curtime + self._RESOLUTION:
break
self.triggered.append(heapq.heappop(self.queue))
if len(self.triggered) > 0:
with self.Locker(self.getcond):
self.getcond.notify()
if __name__ == "__main__":
q = TimelyQueue()
q.start()
N = 50000
t0 = time.time()
for i in range(N):
q.put(time.time() + 2, i)
dt = time.time() - t0
print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
t0 = time.time()
i = 0
while i < N:
a = q.get(3)
if i == 0:
dt = time.time() - t0
print "start get after %.3fs" % dt
t0 = time.time()
i += 1
dt = time.time() - t0
print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)