1

私は一般的なメッセージを発行するサービスを持っており、それらのメッセージのオブザーバブルを作成しました。これらのメッセージには何でも含めることができ、さまざまなプロトコルをその上に重ねることができます。

これらのメッセージから特定のプロトコルを解釈するために、観測可能な第 2 層を追加したいと考えています。たとえば、メッセージのタイプは「更新」、「エラー」、または「完了」です。「更新」メッセージを再発行し、「エラー」でエラーをスローし、「完了」でシーケンスを完了したいと考えています。

どうすればこれをきれいに達成できますか?

これを行うには使用できないと思いますSelectMany。セレクターは返すことができますが、最初の 2Observable.Return()Observable.Throw()のケースでは、セレクターから完了する方法はありません (observer.OnCompleted()基になるオブザーバブルを呼び出してサブスクライブ解除します)。

Observable.Create()サブスクライブメソッド内で基礎となるオブザーバブルを使用してサブスクライブする必要があるようです。私はそれをしましたが、Rx でより一般的な機能的構成スタイルを使用していないため、実装は奇妙に感じます。それは正しい方法ですか?

public IObservable<Message> InterpretProtocol(IObservable<message> stream)
{
  return Observable.Create<Message>(observer =>
  {
    return stream.Subscribe(message =>
    {
      switch (ProtocolMessageTypeOf(message))
      {
        case ProtocolMessageType.Error:
          observer.OnError(new InvalidOperationException(message));
          break;

        case ProtocolMessageType.Complete:
          observer.OnCompleted();
          break;

        default:
          observer.OnNext(message);
          break;
      }
    });
  });
}
4

1 に答える 1