3

MQTT ブローカーに接続しようとしています。接続に失敗した場合に備えて再試行したい。接続の成功または失敗でコールバックを受け取ります。

retryWhen の複数の例を読み、非同期コールバックを処理した後、このコードをまとめました。接続に成功すれば問題なく動作します。e.onError(throwable)また、から同期的に呼び出すと、3 回リトライしFlowableます。e.onError(throwable)しかし、コールバックのonFailure()メソッドから呼び出すと、Android アプリがクラッシュします。

コードは次のとおりです。

RxJavaチェーン

createConnectionFlowable(client, options)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .retryWhen(createRetryFunction())
    .subscribe(createConsumer());

Flowable を作成する

private Flowable<String> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) {
    return Flowable.create(new FlowableOnSubscribe<String>() {

        public void subscribe(final FlowableEmitter<String> e) throws Exception {
                client.connect(options).setActionCallback(new IMqttActionListener() {
                    public void onSuccess(IMqttToken iMqttToken) { e.onComplete(); }
                    public void onFailure(IMqttToken iMqttToken, Throwable throwable) { e.onError(throwable); }
                });
        }
    }, BackpressureStrategy.BUFFER);
}

リトライ機能を作る

private Function<Flowable<Throwable>, Publisher<?>> createRetryFunction() {
    return new Function<Flowable<Throwable>, Publisher<?>>() {

        public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
            return throwableFlowable.zipWith(
                    Flowable.range(1, 3),
                    new BiFunction<Throwable, Integer, Integer>() {
                        public Integer apply(Throwable throwable, Integer integer) throws Exception { return integer; }
                    }
            )
            .flatMap(new Function<Integer, Publisher<?>>() {
                public Publisher<?> apply(Integer integer) throws Exception {
                    return Flowable.timer(integer, TimeUnit.SECONDS);
                }
            });
        }
    };
}

消費者: ここですべての良いことを行う

private Consumer<String> createConsumer() {
    return new Consumer<String>() {
        public void accept(String s) throws Exception {
            Log.d(TAG, "accept: do important stuff here" + s);
        }
    };
}

エラーログ

12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply() called with: throwable = [Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)], integer = [1]
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply: delay retry by seconds:1
12-20 11:51:09.589 16769-16830/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.600 16769-16831/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.lang.Thread.run(Thread.java:818)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.isConnected(IoBridge.java:234)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.connectErrno(IoBridge.java:171)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.connect(IoBridge.java:122)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:183)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:452)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.net.Socket.connect(Socket.java:884)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:   ... 2 more
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: android.system.ErrnoException: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.isConnected(IoBridge.java:223)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:   ... 8 more
12-20 11:51:09.606 16769-16769/com.work.app E/AndroidRuntime: FATAL EXCEPTION: main
                                                                     Process: com.work.app, PID: 16769
                                                                     Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: 

質問

  1. このコードがアプリをクラッシュさせる例外をスローするのはなぜですか? 理想的には、例外を処理する必要がありますか? ここで何が欠けていますか?
  2. 3回再試行しないのはなぜですか?
  3. メソッドe.onError(throwable)から同期的に呼び出すと、同じコードが正しく再試行されるのはなぜですか?Flowable.subscribe()

参考文献

  1. RxJava 1.x retryWhen ドキュメント
  2. このブログ
4

2 に答える 2

1
  1. subscribe使用してConsumer<String>いるため、ストリームのエラーハンドラーを定義していません。これは、エラーが を介してデフォルトのエラー ハンドラに渡されることを意味しますRxJavaPlugins.getErrorHandler().handleError(...)。Android では、このハンドラーは致命的なエラーを引き起こすようです。これを修正するには、Observer<String>代わりにa を使用しますConsumer<String>
  2. ログは、クライアントが Rx 以外で 3 回失敗した (「onFailure」が 3 回言及されている) ことを示しているようです。クライアントがステートフルである可能性があると推測しなければならなかった場合、つまり、最初の接続フォローアップ呼び出しの後、client.connect(...)問題を引き起こす何らかの奇妙な動作を示すことを意味します。ログに示さerror - 1 sec wait - error, errorれているので、コールバックがアクティブなままであるため、2 番目の失敗が RxJava に 2 回送信されると思います。
  3. 同期について話しているときにメソッドについて話していると仮定するとwaitForCompletion()、2 の私の仮定がサポートされます。コールバックが登録されていないため、各スロー可能オブジェクトは 1 回だけ報告され、動作が修正されます。

なぜエミッターが終了後も機能し続けるのか (onError/onComplete) はわかりませんが、仕様ではこれらのメソッドは 1 回だけ呼び出されることが義務付けられているため、この問題の原因となる不特定の動作である可能性があります。

于 2016-12-20T19:21:26.697 に答える
0

私はついにこれを機能させました!

これは RxJava2 の問題ではなく、IMqttActionListenerクライアントが別のスレッドで作成されたとしても、Mqtt (Eclipse Paho)​​ がメイン スレッドでコールバックを実行する方法に問題があることが判明しました!!! .

これに対する簡単な解決策は、クライアントが作成されたスレッドにクライアントが接続するのを待つことです。この方法を除いて、質問で共有されているコードは正しいです

@NonNull
public Flowable<Boolean> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) {
    return Flowable.create(new FlowableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(final FlowableEmitter<Boolean> e) throws Exception {
            IMqttToken connect = client.connect(options);
            connect.waitForCompletion(); //this is blocking and is what was required!!
            if (client.isConnected()) {
                e.onNext(true);
                e.onComplete();
            } else {
                e.onError(connect.getException());
            }

        }
    }, BackpressureStrategy.BUFFER);
}

これがこれらのライブラリで作業する誰かに役立つことを願っています:)

于 2016-12-27T16:24:42.643 に答える