3

処理シーケンスを分割する必要があります (この質問のように、 .net RX を使用してデータ プロセッサのシーケンスを整理する方法)、Azure 環境で複数の計算ユニットに分割する必要があります。
アイデアは、Observable シーケンスを Azure Queues (または Service Bus) にシリアル化し、それを逆シリアル化することです。
生産者または消費者が失敗した場合、他の当事者は生産/消費を継続できるはずです。

そのためのエレガントな方法と使用するもの (Azure Queues または Service Bus) を提案できる人はいますか?
TCP Observable プロバイダー - http://rxx.codeplex.com/wikipage?title=TCP%20Qbservable%20Providerをそのような問題に使用した人はいますか?

4

2 に答える 2

2

次の API を持つメッセージ キューがあるとします。

class MQ {

    public MQ();

    // send a single message from your message queue
    public void send(string keyPath, string msg);

    // Receive a single message from your message queue
    public async Task<string> receive(keyPath);

}

このRXを対応させるには

class MQRX: IObserver<string> {
    MQ _mq;
    string _keyPath

    MQRX(string keyPath){
        _mq = mq;
        _keyPath = keyPath;
    }

    IObservable<string> Observe(){
        return Observable.Defer(()=> mq.receive(keyPath).ToObservable() ).Repeat();
    }

    void OnNext(string msg){
        _mq.send(msg);
    }

    void OnError(Exception e){
        // The message queue might not
        // support serializing exceptions
        // or it might or you might build
        // a protocol for it.
    }
}

フォールト トレラントな方法で使用するため。OnError によって配信された上流でスローされた例外がある場合、再試行は再サブスクライブすることに注意してください

new MQRX("users/1/article/2").
    Retry().
    Subscribe((msg)=>Console.Writeln(msg));

たとえば、書き込み側では、2 秒ごとにメッセージを送信し、エラーが発生した場合はジェネレーターへのサブスクリプションを再試行できます。時間間隔ごとにメッセージを生成するだけの Observable.Interval でエラーが発生する可能性は低いことに注意してください。ただし、ファイルまたはその他のメッセージ キューからの読み取りを想像してみてください。

var mq = new MQRX("users/1/article/2");

Observable.Interval(TimeSpan.FromSeconds(2)).
    Select((x)=>x.ToString()).

同じエラーが何度も発生する可能性があるため、やみくもに再試行するのではなく、おそらく IObservable Catch 拡張メソッドを使用する必要があることに注意してください。リトライ()。サブスクライブ (mq);

于 2012-11-13T13:20:51.990 に答える
0

30 行の VB コードで独自の UDP to RX ラッパーを作成し、タイムアウトを処理しました。TCPラッパーも似ていると思います。

Imports System.Reactive
Imports System.Reactive.Linq
Imports System.Reactive.Threading.Tasks
Imports System.Threading
Imports System.Net
Imports System.Net.Sockets

Public Module UDP
    ''' <summary>
    ''' Generates an IObservable as a UDP stream on the IP endpoint with an optional
    ''' timeout value between results.
    ''' </summary>
    ''' <param name="timeout"></param>
    ''' <returns></returns>
    ''' <remarks></remarks>
    Public Function StreamObserver(
                localPort As Integer, 
                Optional timeout As Nullable(Of TimeSpan) = Nothing
                ) As IObservable(Of UdpReceiveResult)

        Return Linq.Observable.Using(Of UdpReceiveResult, UdpClient)(
            Function() New UdpClient(localPort),
            Function(udpClient)
                Dim o = Observable.Defer(Function() udpClient.
                                                                ReceiveAsync().
                                                                ToObservable())
                If Not timeout Is Nothing Then
                    Return Observable.Timeout(o.Repeat(), timeout.Value)
                Else
                    Return o.Repeat()
                End If
            End Function
        )
    End Function
End Module

必要に応じて、書き込み側でも同じことができます。次に、通常の RX 手法を使用して、データを UDP フレームにシリアル化します。

new UDP.StreamObserver(8080, TimeSpan.FromSeconds(2)).
        Select( function(udpResult) MyDeserializeFunction(udpResult)).
        Subscribe( sub (result) DoSomething(result), 
                   sub(error) DoSomethingWithError ) 
于 2012-11-12T12:56:21.887 に答える