28

2 つのthreading.Event()オブジェクトがあり、いずれかが設定されるまでスリープしたい場合、Python でそれを行う効率的な方法はありますか? select明らかに、ポーリング/タイムアウトで何かを行うことができますが、ファイル記述子に使用される方法と同様に、設定されるまでスレッドをスリープさせたいと思います。

では、次の実装では、効率的な非ポーリング実装はwait_for_eitherどのようになりますか?

a = threading.Event()
b = threading.Event()

wait_for_either(a, b)
4

8 に答える 8

21

これは、非ポーリングで過度のスレッドを使用しないソリューションです。既存Eventの を変更して、変更されるたびにコールバックを起動し、そのコールバックで新しいイベントの設定を処理します。

import threading

def or_set(self):
    self._set()
    self.changed()

def or_clear(self):
    self._clear()
    self.changed()

def orify(e, changed_callback):
    e._set = e.set
    e._clear = e.clear
    e.changed = changed_callback
    e.set = lambda: or_set(e)
    e.clear = lambda: or_clear(e)

def OrEvent(*events):
    or_event = threading.Event()
    def changed():
        bools = [e.is_set() for e in events]
        if any(bools):
            or_event.set()
        else:
            or_event.clear()
    for e in events:
        orify(e, changed)
    changed()
    return or_event

使用例:

def wait_on(name, e):
    print "Waiting on %s..." % (name,)
    e.wait()
    print "%s fired!" % (name,)

def test():
    import time

    e1 = threading.Event()
    e2 = threading.Event()

    or_e = OrEvent(e1, e2)

    threading.Thread(target=wait_on, args=('e1', e1)).start()
    time.sleep(0.05)
    threading.Thread(target=wait_on, args=('e2', e2)).start()
    time.sleep(0.05)
    threading.Thread(target=wait_on, args=('or_e', or_e)).start()
    time.sleep(0.05)

    print "Firing e1 in 2 seconds..."
    time.sleep(2)
    e1.set()
    time.sleep(0.05)

    print "Firing e2 in 2 seconds..."
    time.sleep(2)
    e2.set()
    time.sleep(0.05)

その結果は次のとおりです。

Waiting on e1...
Waiting on e2...
Waiting on or_e...
Firing e1 in 2 seconds...
e1 fired!or_e fired!

Firing e2 in 2 seconds...
e2 fired!

これはスレッドセーフである必要があります。どんなコメントでも大歓迎です。

編集:ああ、ここにあなたのwait_for_either関数がありますが、私がコードを書いた方法ですが、 を作成して渡すのが最善or_eventです. or_eventを手動で設定またはクリアしないでください。

def wait_for_either(e1, e2):
    OrEvent(e1, e2).wait()
于 2012-09-07T14:44:02.293 に答える
5

(ポーリングを使用した) 1つの解決策Eventは、ループ内のそれぞれで順次待機を実行することです。

def wait_for_either(a, b):
    while True:
        if a.wait(tunable_timeout):
            break
        if b.wait(tunable_timeout):
            break

タイムアウトを十分に調整すれば、結果は大丈夫だと思います。


私が考えることができる最良の非ポーリングは、異なるスレッドでそれぞれをEvent待機し、メインスレッドで待機する共有を設定することです。

def repeat_trigger(waiter, trigger):
    waiter.wait()
    trigger.set()

def wait_for_either(a, b):
    trigger = threading.Event()
    ta = threading.Thread(target=repeat_trigger, args=(a, trigger))
    tb = threading.Thread(target=repeat_trigger, args=(b, trigger))
    ta.start()
    tb.start()
    # Now do the union waiting
    trigger.wait()

非常に興味深いので、以前のソリューションのOOPバージョンを作成しました。

class EventUnion(object):
    """Register Event objects and wait for release when any of them is set"""
    def __init__(self, ev_list=None):
        self._trigger = Event()
        if ev_list:
            # Make a list of threads, one for each Event
            self._t_list = [
                Thread(target=self._triggerer, args=(ev, ))
                for ev in ev_list
            ]
        else:
            self._t_list = []

    def register(self, ev):
        """Register a new Event"""
        self._t_list.append(Thread(target=self._triggerer, args=(ev, )))

    def wait(self, timeout=None):
        """Start waiting until any one of the registred Event is set"""
        # Start all the threads
        map(lambda t: t.start(), self._t_list)
        # Now do the union waiting
        return self._trigger.wait(timeout)

    def _triggerer(self, ev):
        ev.wait()
        self._trigger.set()
于 2012-09-07T12:39:06.727 に答える
1

余分なスレッドを開始することは明確な解決策のようですが、あまり効率的ではありません。関数 wait_events は、いずれかのイベントが設定されている場合にブロックします。

def wait_events(*events):
    event_share = Event()

    def set_event_share(event):
        event.wait()
        event.clear()
        event_share.set()
    for event in events:
        Thread(target=set_event_share(event)).start()

    event_share.wait()

wait_events(event1, event2, event3)
于 2013-10-12T08:42:15.307 に答える
1

これは古い質問ですが、これが Google から来た人に役立つことを願っています。
受け入れられた回答はかなり古いものであり、2 回の "orified" イベントで無限ループが発生します。

を使用した実装を次に示します。concurrent.futures

import concurrent.futures
from concurrent.futures import ThreadPoolExecutor

def wait_for_either(events, timeout=None, t_pool=None):
    '''blocks untils one of the events gets set

    PARAMETERS
    events (list): list of threading.Event objects
    timeout (float): timeout for events (used for polling)
    t_pool (concurrent.futures.ThreadPoolExecutor): optional
    '''

    if any(event.is_set() for event in events):
        # sanity check
        pass
    else:
        t_pool = t_pool or ThreadPoolExecutor(max_workers=len(events))
        tasks = []
        for event in events:
            tasks.append(t_pool.submit(event.wait))

        concurrent.futures.wait(tasks, timeout=timeout, return_when='FIRST_COMPLETED')
        # cleanup
        for task in tasks:
            try:
                task.result(timeout=0)
            except concurrent.futures.TimeoutError:
                pass

機能のテスト

import threading
import time
from datetime import datetime, timedelta

def bomb(myevent, sleep_s):
    '''set event after sleep_s seconds'''
    with lock:
        print('explodes in ', datetime.now() + timedelta(seconds=sleep_s))
    time.sleep(sleep_s)
    myevent.set()
    with lock:
        print('BOOM!')

lock = threading.RLock()  # so prints don't get jumbled
a = threading.Event()
b = threading.Event()

t_pool = ThreadPoolExecutor(max_workers=2)

threading.Thread(target=bomb, args=(event1, 5), daemon=True).start()
threading.Thread(target=bomb, args=(event2, 120), daemon=True).start()

with lock:
    print('1 second timeout, no ThreadPool', datetime.now())

wait_for_either([a, b], timeout=1)

with lock:
    print('wait_event_or done', datetime.now())
    print('=' * 15)

with lock:
    print('wait for event1', datetime.now())

wait_for_either([a, b], t_pool=t_pool)

with lock:
    print('wait_event_or done', datetime.now())
于 2018-06-29T17:49:23.390 に答える
0

きれいではありませんが、2 つの追加スレッドを使用してイベントを多重化できます...

def wait_for_either(a, b):
  flag = False #some condition variable, event, or similar

  class Event_Waiter(threading.Thread):
    def __init__(self, event):
        self.e = event
    def run(self):
        self.e.wait()
        flag.set()

  a_thread = Event_Waiter(a)
  b_thread = Event_Waiter(b)
  a.start()
  b.start()
  flag.wait()

到着が早すぎると、誤って両方のイベントを取得することを心配する必要がある場合があることに注意してください。ヘルパー スレッド (a_thread および b_thread) は、フラグを設定しようとして同期をロックし、他のスレッドを強制終了する必要があります (そのスレッドのイベントが消費された場合は、そのスレッドのイベントをリセットする可能性があります)。

于 2012-09-07T13:46:52.860 に答える