私はRxJavaを実行しており、onNext()
メソッドを使用してデータを生成するサブジェクトを作成しています。私は春を使用しています。
これは私のセットアップです:
@Component
public class SubjectObserver {
private SerializedSubject<SomeObj, SomeObj> safeSource;
public SubjectObserver() {
safeSource = PublishSubject.<SomeObj>create().toSerialized();
**safeSource.subscribeOn(<my taskthreadExecutor>);**
**safeSource.observeOn(<my taskthreadExecutor>);**
safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
@Override
public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
LOGGER.debug("{} invoked.", Thread.currentThread().getName());
doSomething();
}
}
}
public void publish(SomeObj myObj) {
safeSource.onNext(myObj);
}
}
RxJava ストリームで新しいデータを生成する方法は@Autowire private SubjectObserver subjectObserver
、呼び出してから呼び出すことです。subjectObserver.publish(newDataObjGenerated)
subscribeOn()
&に何を指定してもobserveOn()
:
- Schedulers.io()
- スケジューラー.計算()
- 私のスレッド
- Schedulers.newThread
とそのonNext()
内部の実際の作業はonNext()
、対象の を実際に呼び出してデータを生成/生成するのと同じスレッドで行われます。
これは正しいです?もしそうなら、私は何が欠けていますか? doSomething()
別のスレッドで行われることを期待していました。
アップデート
私の呼び出しクラスでは、メソッドを呼び出す方法を変更するとpublish
、もちろん、サブスクライバーが実行される新しいスレッドが割り当てられます。
taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj));
ありがとう、