1

私はRxJavaを実行しており、onNext()メソッドを使用してデータを生成するサブジェクトを作成しています。私はを使用しています。

これは私のセットアップです:

@Component
public class SubjectObserver {
    private SerializedSubject<SomeObj, SomeObj> safeSource;
    public SubjectObserver() {
       safeSource = PublishSubject.<SomeObj>create().toSerialized();
       **safeSource.subscribeOn(<my taskthreadExecutor>);**
       **safeSource.observeOn(<my taskthreadExecutor>);** 
       safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
          @Override
          public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
            LOGGER.debug("{} invoked.", Thread.currentThread().getName());
            doSomething();
          }
      }
    }
    public void publish(SomeObj myObj) {
        safeSource.onNext(myObj);
    }
}

RxJava ストリームで新しいデータを生成する方法は@Autowire private SubjectObserver subjectObserver 、呼び出してから呼び出すことです。subjectObserver.publish(newDataObjGenerated)

subscribeOn()&に何を指定してもobserveOn():

  • Schedulers.io()
  • スケジューラー.計算()
  • 私のスレッド
  • Schedulers.newThread

とそのonNext()内部の実際の作業はonNext()、対象の を実際に呼び出してデータを生成/生成するのと同じスレッドで行われます。

これは正しいです?もしそうなら、私は何が欠けていますか? doSomething()別のスレッドで行われることを期待していました。

アップデート

私の呼び出しクラスでは、メソッドを呼び出す方法を変更するとpublish、もちろん、サブスクライバーが実行される新しいスレッドが割り当てられます。

taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj));

ありがとう、

4

1 に答える 1

8

Observable/の各演算子はSubject、追加の動作を備えた新しいインスタンスを返しますが、コードは を適用するだけで、生成して rawsubscribeOnobserveOnサブスクライブするものをすべて破棄しSubjectます。メソッド呼び出しをチェーンしてからサブスクライブする必要があります。

safeSource = PublishSubject.<AsyncRemoteRequest>create().toSerialized();

safeSource
.subscribeOn(<my taskthreadExecutor>)
.observeOn(<my taskthreadExecutor>)
.subscribe(new Subscriber<AsyncRemoteRequest>() {
     @Override
     public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
         LOGGER.debug("{} invoked.", Thread.currentThread().getName());
         doSomething();
     }
});

メソッドでサブスクリプションの副作用が発生しないためsubscribeOn、 a には実質的な影響がないことに注意してください。PublishSubjectsubscribe()

于 2016-10-25T21:39:19.037 に答える