アプリには、周辺機器で実行する BLE 操作のキューが事実上あります。各操作は、ペリフェラルへの接続を確立することから始まります。ペリフェラルは を返しますObservable<RxBleConnection>
。キュー内の最初のアイテムが接続を開始し、後続の操作はこれ (shared) を受け取るだけRxBleConnection
です。
簡単に言えば、キューは次の方法で実行されます。
Observable.concatDelayError(queuedOperations)
接続を確立できない場合、または 1 つの操作中に切断された場合、残りのキューに入れられた操作はそれぞれ再試行され、接続の再確立が試みられます。
RxBleConnection
接続が無効になると、キューに入れられた操作が新しい接続を再確立するのではなく、常に無効になったものを受け取るように動作を変更することにしました。再試行ロジックは引き続き実行されますが、これらのインスタンスではすぐに失敗します --- 接続とは関係なく、操作が失敗する可能性がある他の理由があります。
この動作を生成するために、 の直後に を構成しPublishSubject
ますObservable<RxBleConnection>
。このサブジェクトは元のサブジェクトを委任するだけですRxBleConnection
--- 以下のコードを参照してください。接続がエラー状態に達すると、サブジェクトへの後続のサブスクリプションでエラーが発生します。それ以外の場合は、共有接続が返されます。これはまさに私が望んでいた動作であり、エラーが発生したときに設計どおりに動作するように見えます。ただし、すべてが成功すると、現在問題が発生しています。
変更前は、キュー内のすべての操作が消費されると、接続が自動的に解放されました。ただし、を追加するとPublishSubject
、操作は成功しますが、接続は開いたままになります。デバッグ ステートメントを使用して、サブジェクトのonUnsubscribe
andonTerminate
が呼び出されないことを確認しました。オリジナルRxBleConnection
は最終的にタイムアウトになります --- そしてそのonUnsubscribe
とonTerminate
が呼び出されます。
アプリが周辺機器に接続されたままになっている原因は、私が間違って何をしているのか疑問に思っていました.
private Observable.Transformer<RxBleConnection, RxBleConnection> createConnectionSubject() {
return rxBleConnectionObservable -> {
final PublishSubject<RxBleConnection> subject = PublishSubject.create();
rxBleConnectionObservable.subscribe(
subject::onNext,
subject::onError,
subject::onCompleted);
return subject;
};
}