次の API を持つメッセージ キューがあるとします。
class MQ {
public MQ();
// send a single message from your message queue
public void send(string keyPath, string msg);
// Receive a single message from your message queue
public async Task<string> receive(keyPath);
}
このRXを対応させるには
class MQRX: IObserver<string> {
MQ _mq;
string _keyPath
MQRX(string keyPath){
_mq = mq;
_keyPath = keyPath;
}
IObservable<string> Observe(){
return Observable.Defer(()=> mq.receive(keyPath).ToObservable() ).Repeat();
}
void OnNext(string msg){
_mq.send(msg);
}
void OnError(Exception e){
// The message queue might not
// support serializing exceptions
// or it might or you might build
// a protocol for it.
}
}
フォールト トレラントな方法で使用するため。OnError によって配信された上流でスローされた例外がある場合、再試行は再サブスクライブすることに注意してください
new MQRX("users/1/article/2").
Retry().
Subscribe((msg)=>Console.Writeln(msg));
たとえば、書き込み側では、2 秒ごとにメッセージを送信し、エラーが発生した場合はジェネレーターへのサブスクリプションを再試行できます。時間間隔ごとにメッセージを生成するだけの Observable.Interval でエラーが発生する可能性は低いことに注意してください。ただし、ファイルまたはその他のメッセージ キューからの読み取りを想像してみてください。
var mq = new MQRX("users/1/article/2");
Observable.Interval(TimeSpan.FromSeconds(2)).
Select((x)=>x.ToString()).
同じエラーが何度も発生する可能性があるため、やみくもに再試行するのではなく、おそらく IObservable Catch 拡張メソッドを使用する必要があることに注意してください。リトライ()。サブスクライブ (mq);