Pythonのリアクティブ拡張機能でのスケジューリングについて頭を悩ませようとしています。subscribe_on
複数のオブザーバブルを並行して処理するために使用したいと思います。これは、オブザーバブルが で作成されている場合は正常に機能しますがjust
、たとえばrange
またはfrom_
が使用されている場合は機能しません。
just
のデフォルトは ですがScheduler.immediate
、他のジェネレータのデフォルトはScheduler.current_thread
です。それが違いを引き起こしますが、私には一貫性がありません。おそらく、私は完全な問題を把握していないからです。
次の例を検討してください。
import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading
def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)
def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)
# Creates new thread (I like)
rx.Observable.just(3)\
.do_action(work)\
.subscribe_on(Scheduler.new_thread)\
.subscribe(finish)
# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
.do_action(work) \
.subscribe_on(Scheduler.new_thread) \
.subscribe(finish)
observe_on
スケジューラーがジェネレーターに直接渡された場合に動作しますが、オブザーバブルの作成を処理から切り離して、次のようなことを実現したいと思います。
import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading
def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)
def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)
def factory_single():
return rx.Observable.just(1).do_action(work)
def factory_multiple():
return rx.Observable.range(2, 4).do_action(work)
def process(factory):
factory().subscribe_on(Scheduler.new_thread).subscribe(finish)
# Creates a new thread (I like)
process(factory_single)
# Runs on MainThread (I don't like)
process(factory_multiple)
私は誤解していsubscribe_on
ますか?私のアプローチは間違っていますか?