1

Stomp over Websockets 用の小さな RxJS ラッパーを構築していますが、これは既に動作しています。

しかし今、私は本当にクールな機能のアイデアを思いつきました.RxJSを使用して簡単に実行できる可能性があります.

現在の動作:

myStompWrapper.configure("/stomp_endpoint");
myStompWrapper.connect();      // onSuccess: set state to CONNECTED

// state (Observable) can be DISCONNECTED or CONNECTED
var subscription = myStompWrapper.getState()
    .filter(state => state == "CONNECTED")
    .flatMap(myStompWrapper.subscribeDestination("/foo"))
    .subscribe(msg => console.log(msg));

// ... and some time later:
subscription.unsubscribe();    // calls 'unsubscribe' for this stomp destination
myStompWrapper.disconnect();   // disconnects the stomp websocket connection

ご覧のとおり、に登録するには を待たなければなりません。そうしないと、Stomp ライブラリからエラーが発生します。state == "CONNECTED"subscribeDestination(..)

新しい動作:

次の実装では、ユーザーにとって物事がより簡単になるはずです。これが私が想像するものです:

myStompWrapper.configure("/stomp_endpoint");

var subscription = myStompWrapper.subscribeDestination("/foo")
    .subscribe(msg => console.log(msg));

// ... and some time later:
subscription.unsubscribe();

内部でどのように動作するか:

  1. configure間のみ呼び出すことができますDISCONNECTED
  2. subscribeDestination呼び出されると、次の 2 つの可能性があります。
    1. if CONNECTED: 宛先にサブスクライブするだけ
    2. if DISCONNECTED: 最初に呼び出しconnect()、次に宛先にサブスクライブする
  3. unsubscribe呼び出されると、次の 2 つの可能性があります。
    1. これが最後のサブスクリプションの場合: 呼び出しdisconnect()
    2. これが最後のサブスクリプションでない場合: 何もしない

そこにたどり着く方法はまだわかりませんが、それがここでこの質問をする理由です;-)

前もって感謝します!

編集:より多くのコード、例、説明

切断されていないときにconfigure()が呼び出されると、 . しかし、それは大したことではありません。Error

stompClient.connect(..)はノンブロッキングです。onSuccessコールバックがあります:

public connect() {
  stompClient.connect({}, this.onSuccess, this.errorHandler);
}

public onSuccess = () => {
  this.state.next(State.CONNECTED);
}

observeDestination(..)は、Stomp メッセージ チャネル (= 宛先) にサブスクライブし、Rx.Observableこの Stomp メッセージ チャネルからサブスクライブ解除するために使用できる を返します。

public observeDestination(destination: string) {
  return this.state
      .filter(state => state == State.CONNECTED)
      .flatMap(_ => Rx.Observable.create(observer => {
        let stompSubscription = this.client.subscribe(
            destination,
            message => observer.next(message),
            {}
        );

        return () => {
          stompSubscription.unsubscribe();
        }
      }));
}

次のように使用できます。

myStompWrapper.configure("/stomp_endpoint");
myStompWrapper.connect();

myStompWrapper.observeDestination("/foo")
    .subscribe(..);

myStompWrapper.observeDestination("/bar")
    .subscribe(..);

今、私は取り除きたいと思いますmyStompWrapper.connect()this.connect()コードは、最初のコードが呼び出しによってサブスクライブしたときに自動的に呼び出され、最後のコードが呼び出されたときに呼び出すobserveDestination(..).subscribe(..)必要があります。this.disconnect()unsubscribe()

例:

myStompWrapper.configure("/stomp_endpoint");

let subscription1 = myStompWrapper.observeDestination("/foo")
    .subscribe(..); // execute connect(), because this
                    // is the first subscription

let subscription2 = myStompWrapper.observeDestination("/bar")
    .subscribe(..);

subscription2.unsubscribe();
subscription1.unsubscribe(); // execute disconnect(), because this 
                             // was the last subscription
4

1 に答える 1