5

Pythonのリアクティブ拡張機能でのスケジューリングについて頭を悩ませようとしています。subscribe_on複数のオブザーバブルを並行して処理するために使用したいと思います。これは、オブザーバブルが で作成されている場合は正常に機能しますがjust、たとえばrangeまたはfrom_が使用されている場合は機能しません。

justのデフォルトは ですがScheduler.immediate、他のジェネレータのデフォルトはScheduler.current_threadです。それが違いを引き起こしますが、私には一貫性がありません。おそらく、私は完全な問題を把握していないからです。
次の例を検討してください。

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
    print "processing %s on thread %s" % (x, threading.currentThread().name)
    time.sleep(1)


def finish(x):
    print "finished %s on thread %s" % (x, threading.currentThread().name)


# Creates new thread (I like)
rx.Observable.just(3)\
    .do_action(work)\
    .subscribe_on(Scheduler.new_thread)\
    .subscribe(finish)

# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
    .do_action(work) \
    .subscribe_on(Scheduler.new_thread) \
    .subscribe(finish)

observe_onスケジューラーがジェネレーターに直接渡された場合に動作しますが、オブザーバブルの作成を処理から切り離して、次のようなことを実現したいと思います。

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
    print "processing %s on thread %s" % (x, threading.currentThread().name)
    time.sleep(1)


def finish(x):
    print "finished %s on thread %s" % (x, threading.currentThread().name)


def factory_single():
    return rx.Observable.just(1).do_action(work)


def factory_multiple():
    return rx.Observable.range(2, 4).do_action(work)


def process(factory):
    factory().subscribe_on(Scheduler.new_thread).subscribe(finish)

# Creates a new thread (I like)
process(factory_single)

# Runs on MainThread (I don't like)
process(factory_multiple)

私は誤解していsubscribe_onますか?私のアプローチは間違っていますか?

4

1 に答える 1