7

レトロフィットの RxJava サポートを使用してオブザーバブルを連鎖させることに問題があります。私はおそらくそれを使用する方法を誤解しています, そうでなければ、それは改造のバグかもしれません. ここの誰かが何が起こっているのかを理解するのを手伝ってくれることを願っています。編集:私はこれらの応答に MockRestAdapter を使用しています - RxSupport の実装がわずかに異なるため、これは関連している可能性があります。

これは偽の銀行アプリです。送金を行おうとしています。送金が完了したら、アカウントの値を更新するためにアカウント リクエストを実行する必要があります。これは基本的に、flatMap を試してみる言い訳にすぎません。残念ながら、次のコードは機能しません。サブスクライバーに通知されることはありません。

ケース 1: 改造によって生成された 2 つのオブザーバブルを連鎖させる

転送サービス (注: レトロフィットで生成されたオブザーバブルを返します):

@FormUrlEncoded @POST("/user/transactions/")
public Observable<TransferResponse> transfer(@Field("session_id") String sessionId,
                                             @Field("from_account_number") String fromAccountNumber,
                                             @Field("to_account_number") String toAccountNumber,
                                             @Field("amount") String amount);

アカウント サービス (注: レトロフィットで生成されたオブザーバブルを返します):

@FormUrlEncoded @POST("/user/accounts")
public Observable<List<Account>> getAccounts(@Field("session_id") String sessionId);

改造によって生成された 2 つのオブザーバブルを連結します。

transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount)
            .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() {
                @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
                    return accountsService.getAccounts(session.getSessionId());
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

ケース 2: 独自のオブザーバブルを作成し、レトロフィットで生成されたものとチェーンする

「フラット マップ」呼び出しに対する Retrofit の組み込み Rx サポートを無視すると、完全に機能します。すべてのサブスクライバーに通知が届きます。下記参照:

新しいアカウント サービス (注: オブザーバブルは生成されません):

@FormUrlEncoded @POST("/user/accounts")
public List<Account> getAccountsBlocking(@Field("session_id") String sessionId);

独自のオブザーバブルを作成し、アイテムを自分で発行します。

transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount)
            .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() {
                @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
                    return Observable.create(new Observable.OnSubscribe<List<Account>>() {
                        @Override public void call(Subscriber<? super List<Account>> subscriber) {
                            List<Account> accounts = accountsService.getAccountsBlocking(session.getSessionId());
                            subscriber.onNext(accounts);
                            subscriber.onCompleted();
                        }
                    });
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

どんな助けでも大歓迎です!

4

1 に答える 1

5

答えはイエスです。Retrofit からオブザーバブルをチェーンできるはずです。MockRestAdapter$MockRxSupport:createMockObservable プライベート クラスにバグがあるようです。サブスクライバーをオブザーバブルにサブスクライブすることに関してスケジューリングが行われる方法は間違っているようです。オブザーバブルへのサブスクライブは、HttpExecutor スレッド自体が開始された後に行われます。あなたの Schedulers.io() スレッドからの元のフローが完了し、mockHandler.invokeSync が返された Observable をサブスクライブできるようになる前に、サブスクライブが解除されていると思います。retrofit-mock モジュールのコードを見れば、この説明がある程度理解できると思います。

retrofit-mock のみを使用する場合の現在のコードでの回避策として、内部のデフォルト Executor を独自の ImmediateExecutor 実装に置き換えることができます。これにより、少なくともモックをテストするときに、Schedulers.io によって提供される単一のスレッド フローを持つことができます。

// ImmediateExecutor.java
public class ImmediateExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        command.run();
    }
}

// Create your RestAdapter with your ImmdiateExecutor
RestAdapter adapter = new RestAdapter.Builder()
            .setEndpoint(endpoint)
            .setExecutors(new ImmediateExecutor(), null)
            .build();

ソースでの問題を修正するには、retrofit-mock プロジェクトをソースとしてプロジェクトに含め、以下のコードを使用して MockRestAdapter$MockRxSupport:createMockObservable メソッドを変更することもできます。ユースケースをテストしましたが、問題は解決しました。

--- MockRestAdapter.java$MockRxSupport ----

Observable createMockObservable(final MockHandler mockHandler, final RestMethodInfo methodInfo,
        final RequestInterceptor interceptor, final Object[] args) {
      return Observable.create(new Observable.OnSubscribe<Object>() {
        @Override public void call(final Subscriber<? super Object> subscriber) {
          try {
            if (subscriber.isUnsubscribed()) return;
            Observable observable =
                (Observable) mockHandler.invokeSync(methodInfo, interceptor, args);

            observable.subscribeOn(Schedulers.from(httpExecutor));

            //noinspection unchecked
            observable.subscribe(subscriber);

          } catch (RetrofitError e) {
            subscriber.onError(errorHandler.handleError(e));
          } catch (Throwable e) {
            subscriber.onError(e);
          }
        }
      });
    }

ここでRetrofit プロジェクトの問題を作成しました。彼らがそれを受け入れるかどうかを確認します。

于 2014-07-15T14:44:20.387 に答える