2

ニーズ: TCP 接続を使用した長期実行プログラム

AC# 4.0 (VS1010、XP) プログラムは、TCP を使用してホストに接続し、バイトを送受信し、場合によっては接続を適切に閉じてから再度開く必要があります。周囲のコードは Rx.NetObservableスタイルを使用して記述されています。データの量は少ないですが、プログラムは継続的に実行する必要があります (リソースを適切に破棄することでメモリ リークを回避します)。

以下のテキストは、私が検索して見つけたものを説明するため、長いです。これで動作するようになりました。

全体的な質問は次のとおりです。Rx は直感的でない場合があるため、解決策は適切ですか? それは信頼できますか (たとえば、何年も問題なく動作しますか)?

これまでの解決策

送信

NetworkStreamプログラムは次のように取得します。

TcpClient tcpClient = new TcpClient();
LingerOption lingerOption = new LingerOption(false, 0); // Make sure that on call to Close(), connection is closed immediately even if some data is pending.
tcpClient.LingerState = lingerOption;

tcpClient.Connect(remoteHostPort);
return tcpClient.GetStream();

非同期送信は簡単です。Rx.Net では、従来のソリューションよりもはるかに短くクリーンなコードでこれを処理できます。で専用スレを立てましたEventLoopScheduler。送信が必要な操作は、 を使用して表現されIObservableます。ObserveOn(sendRecvThreadScheduler)すべての送信操作がそのスレッドで行われるという保証を使用します。

sendRecvThreadScheduler = new EventLoopScheduler(
    ts =>
    {
        var thread = new System.Threading.Thread(ts) { Name = "my send+receive thread", IsBackground = true };
        return thread;
    });
        // Loop code for sending not shown (too long and off-topic).

これまでのところ、これは優れており、完璧です。

受け取る

Rx.Net は、データを受信するために、従来のソリューションよりも短くてクリーンなコードも許可する必要があるようです。いくつかのリソース ( http://www.introtorx.com/など) と stackoverflow を読んだ後、非常に簡単な解決策は、https: //stackoverflow.com/a/14464068 のように非同期プログラミングを Rx.Net にブリッジすることです。 /1429390 :

public static class Ext
{
    public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize)
    {
        // to hold read data
        var buffer = new byte[bufferSize];
        // Step 1: async signature => observable factory
        var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
            stream.BeginRead,
            stream.EndRead);
        return Observable.While(
            // while there is data to be read
            () => stream.CanRead,
            // iteratively invoke the observable factory, which will
            // "recreate" it such that it will start from the current
            // stream position - hence "0" for offset
            Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
                .Select(readBytes => buffer.Take(readBytes).ToArray()));
    }
}

それは主に動作します。バイトを送受信できます。

閉店時間

これは、物事がうまくいかなくなるときです。

ストリームを閉じて、物事をきれいに保つ必要がある場合があります。基本的にこれは、読み取りを停止し、バイト受信オブザーバブルを終了し、新しい接続で新しい接続を開くことを意味します。

たとえば、接続がリモート ホストによって強制的に閉じられると、BeginRead()/EndRead()すぐにすべての CPU を消費するループがゼロ バイトを返します。高レベルのコードにこれ (高レベルの要素が利用可能なコンテキストでSubscribe()toを使用) とクリーンアップ (ストリームのクローズと破棄を含む) を認識させます。ReadObservableこれもうまく機能し、 によって返されたオブジェクトの破棄を処理しますSubscribe()

    someobject.readOneStreamObservableSubscription = myobject.readOneStreamObservable.Subscribe(buf =>
    {
        if (buf.Length == 0)
        {
                MyLoggerLog("Read explicitly returned zero bytes.  Closing stream.");
                this.pscDestroyIfAny();
        }
    });

時々、ストリームを閉じる必要があります。しかし、明らかにこれにより、非同期読み取りで例外がスローされる必要があります。c# - BeginRead と BeginWrite を途中で中止する適切な方法は? - スタックオーバーフロー

シーケンスを終了さCancellationTokenせるを追加しました。長時間スリープする可能性があるため、Observable.While()これはこれらの例外を回避するのにあまり役立ちません。BeginRead()

オブザーバブルの未処理の例外により、プログラムが終了しました。提供された .net の検索- 例外の後もサブスクリプションを使用し続ける - スタック オーバーフローは、壊れたものを空のもので効果的に再開するCatchを追加することを提案しました。Observable

コードは次のようになります。

public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize, CancellationToken token)
{
    // to hold read data
    var buffer = new byte[bufferSize];
    // Step 1: async signature => observable factory
    var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
        stream.BeginRead,
        stream.EndRead);
    return Observable.While(
        // while there is data to be read
        () =>
        {
            return (!token.IsCancellationRequested) && stream.CanRead;
        },
        // iteratively invoke the observable factory, which will
        // "recreate" it such that it will start from the current
        // stream position - hence "0" for offset
        Observable.Defer(() =>
            {
                if ((!token.IsCancellationRequested) && stream.CanRead)
                {
                    return asyncRead(buffer, 0, bufferSize);
                }
                else
                {
                    return Observable.Empty<int>();
                }
            })
            .Catch(Observable.Empty<int>()) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.
        .Select(readBytes => buffer.Take(readBytes).ToArray()));
}

今何?質問

これはうまくいくようです。リモート ホストが接続を強制的に閉じた、または到達できなくなったという条件が検出され、上位レベルのコードが接続を閉じて再試行します。ここまでは順調ですね。

物事が完全に正しいと感じるかどうかはわかりません。

一つには、その行:

.Catch(Observable.Empty<int>()) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.

命令型コードでの空の catch ブロックの悪い習慣のように感じます。実際のコードは例外をログに記録し、上位レベルのコードは応答がないことを検出して正しく処理するため、かなり問題ないと見なす必要があります (以下を参照)。

.Catch((Func<Exception, IObservable<int>>)(ex =>
{
    MyLoggerLogException("On asynchronous read from network.", ex);
    return Observable.Empty<int>();
})) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.

また、これは実際、ほとんどの従来のソリューションよりも短くなっています。

解決策は正しいですか、それともよりシンプルでクリーンな方法を見逃していましたか?

Reactive Extensions のウィザードにとって明らかな恐ろしい問題はありますか?

ご清聴ありがとうございました。

4

0 に答える 0