私は一般的なメッセージを発行するサービスを持っており、それらのメッセージのオブザーバブルを作成しました。これらのメッセージには何でも含めることができ、さまざまなプロトコルをその上に重ねることができます。
これらのメッセージから特定のプロトコルを解釈するために、観測可能な第 2 層を追加したいと考えています。たとえば、メッセージのタイプは「更新」、「エラー」、または「完了」です。「更新」メッセージを再発行し、「エラー」でエラーをスローし、「完了」でシーケンスを完了したいと考えています。
どうすればこれをきれいに達成できますか?
これを行うには使用できないと思いますSelectMany
。セレクターは返すことができますが、最初の 2Observable.Return()
つObservable.Throw()
のケースでは、セレクターから完了する方法はありません (observer.OnCompleted()
基になるオブザーバブルを呼び出してサブスクライブ解除します)。
Observable.Create()
サブスクライブメソッド内で基礎となるオブザーバブルを使用してサブスクライブする必要があるようです。私はそれをしましたが、Rx でより一般的な機能的構成スタイルを使用していないため、実装は奇妙に感じます。それは正しい方法ですか?
public IObservable<Message> InterpretProtocol(IObservable<message> stream)
{
return Observable.Create<Message>(observer =>
{
return stream.Subscribe(message =>
{
switch (ProtocolMessageTypeOf(message))
{
case ProtocolMessageType.Error:
observer.OnError(new InvalidOperationException(message));
break;
case ProtocolMessageType.Complete:
observer.OnCompleted();
break;
default:
observer.OnNext(message);
break;
}
});
});
}