MQTT ブローカーに接続しようとしています。接続に失敗した場合に備えて再試行したい。接続の成功または失敗でコールバックを受け取ります。
retryWhen の複数の例を読み、非同期コールバックを処理した後、このコードをまとめました。接続に成功すれば問題なく動作します。e.onError(throwable)
また、から同期的に呼び出すと、3 回リトライしFlowable
ます。e.onError(throwable)
しかし、コールバックのonFailure()
メソッドから呼び出すと、Android アプリがクラッシュします。
コードは次のとおりです。
RxJavaチェーン
createConnectionFlowable(client, options)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(createRetryFunction())
.subscribe(createConsumer());
Flowable を作成する
private Flowable<String> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) {
return Flowable.create(new FlowableOnSubscribe<String>() {
public void subscribe(final FlowableEmitter<String> e) throws Exception {
client.connect(options).setActionCallback(new IMqttActionListener() {
public void onSuccess(IMqttToken iMqttToken) { e.onComplete(); }
public void onFailure(IMqttToken iMqttToken, Throwable throwable) { e.onError(throwable); }
});
}
}, BackpressureStrategy.BUFFER);
}
リトライ機能を作る
private Function<Flowable<Throwable>, Publisher<?>> createRetryFunction() {
return new Function<Flowable<Throwable>, Publisher<?>>() {
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.zipWith(
Flowable.range(1, 3),
new BiFunction<Throwable, Integer, Integer>() {
public Integer apply(Throwable throwable, Integer integer) throws Exception { return integer; }
}
)
.flatMap(new Function<Integer, Publisher<?>>() {
public Publisher<?> apply(Integer integer) throws Exception {
return Flowable.timer(integer, TimeUnit.SECONDS);
}
});
}
};
}
消費者: ここですべての良いことを行う
private Consumer<String> createConsumer() {
return new Consumer<String>() {
public void accept(String s) throws Exception {
Log.d(TAG, "accept: do important stuff here" + s);
}
};
}
エラーログ
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply() called with: throwable = [Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)], integer = [1]
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply: delay retry by seconds:1
12-20 11:51:09.589 16769-16830/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.600 16769-16831/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.lang.Thread.run(Thread.java:818)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.isConnected(IoBridge.java:234)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.connectErrno(IoBridge.java:171)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.connect(IoBridge.java:122)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:183)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:452)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.net.Socket.connect(Socket.java:884)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: ... 2 more
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: android.system.ErrnoException: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.isConnected(IoBridge.java:223)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: ... 8 more
12-20 11:51:09.606 16769-16769/com.work.app E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.work.app, PID: 16769
Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms:
質問
- このコードがアプリをクラッシュさせる例外をスローするのはなぜですか? 理想的には、例外を処理する必要がありますか? ここで何が欠けていますか?
- 3回再試行しないのはなぜですか?
- メソッド
e.onError(throwable)
から同期的に呼び出すと、同じコードが正しく再試行されるのはなぜですか?Flowable.subscribe()
参考文献