6

注:この質問の長さについてお詫び申し上げます。多くの情報を入力する必要がありました。それがあまりにも多くの人々が単にそれをすくい取って仮定をすることを引き起こさないことを願っています。全部お読みください。ありがとう。

ソケットを介して入ってくるデータのストリームがあります。このデータは行指向です。

.NET(BeginReadなど)のAPM(非同期プログラミング方式)を使用しています。非同期I/Oはバッファベースであるため、これによりストリームベースのI/Oを使用できなくなります。データを再パッケージ化してメモリストリームなどのストリームに送信することは可能ですが、そこにも問題があります。

問題は、私の入力ストリーム(私は制御できません)がストリームの長さに関する情報を私に提供しないことです。これは、次のような改行行のストリームです。

COMMAND\n
...Unpredictable number of lines of data...\n
END COMMAND\n
....repeat....

したがって、APMを使用すると、特定のデータセットの長さがわからないため、データのブロックがバッファの境界を越えて複数の読み取りが必要になる可能性がありますが、これらの複数の読み取りも複数のデータブロックにまたがります。

例:

Byte buffer[1024] = ".................blah\nThis is another l"
[another read]
                    "ine\n.............................More Lines..."

私が最初に考えたのは、StringBuilderを使用して、SBにバッファー行を追加することでした。これはある程度は機能しますが、データのブロックを抽出するのは難しいことがわかりました。StringReaderを使用して新しい行のデータを読み取ろうとしましたが、StringReaderが最後に追加されたブロックの最後に部分的な行を返し、その後にnullを返すため、完全な行を取得しているかどうかを知る方法がありませんでした。返されたものが完全に新しいデータ行であったかどうかを知る方法はありません。

例:

// Note: no newline at the end
StringBuilder sb = new StringBuilder("This is a line\nThis is incomp..");
StringReader sr = new StringReader(sb);
string s = sr.ReadLine(); // returns "This is a line"
s = sr.ReadLine();        // returns "This is incomp.."

さらに悪いことに、データに追加し続けると、バッファがどんどん大きくなり、これは一度に数週間または数か月実行される可能性があるため、適切なソリューションではありません。

私の次の考えは、データのブロックを読んでいるときにSBからデータのブロックを削除することでした。これには独自のReadLine関数を作成する必要がありましたが、読み取りと書き込み中にデータをロックするのに行き詰まりました。また、データのより大きなブロック(数百の読み取りとメガバイトのデータで構成される可能性があります)では、バッファー全体をスキャンして改行を探す必要があります。それは効率的ではなく、かなり醜いです。

非同期I/Oの便利さを備えたStreamReader/Writerのシンプルさを備えたものを探しています。

私の次の考えは、MemoryStreamを使用し、データのブロックをメモリストリームに書き込み、次にStreamReaderをストリームに接続してReadLineを使用することでしたが、バッファーでの最後の読み取りが完全な行であるかどうかを知ることに問題があります。さらに、ストリームから「古い」データを削除するのはさらに困難です。

また、同期読み取りでスレッドを使用することも考えました。これには、StreamReaderを使用すると、接続が切断された場合を除いて、ReadLine()から常にフルラインが返されるという利点があります。ただし、これには接続のキャンセルに関する問題があり、特定の種類のネットワークの問題により、ブロッキングソケットが長時間ハングする可能性があります。データ受信をブロックしているプログラムの存続期間中、スレッドを拘束したくないので、非同期IOを使用しています。

接続は長続きします。そして、データは時間の経過とともに流れ続けます。最初の接続中は大量のデータフローがあり、そのフローが完了すると、ソケットは開いたままになり、リアルタイムの更新を待機します。最初のフローがいつ「終了」したかは正確にはわかりません。これ以上データがすぐに送信されないことを知る唯一の方法だからです。これは、最初のデータの読み込みが完了するのを待ってから処理することができないことを意味します。処理が開始されると、「リアルタイム」で処理が滞ります。

それで、誰かがこの状況を過度に複雑にならない方法で処理するための良い方法を提案できますか?私はこれをできるだけシンプルでエレガントにしたいと思っていますが、すべてのエッジケースのために、ますます複雑なソリューションを考え続けています。私が欲しいのは、特定の基準(つまり、改行で終了する文字列)に一致するデータをポップすると同時に、より多くのデータを簡単に追加し続けることができる、ある種のFIFOだと思います。

4

2 に答える 2

5

とても興味深い質問です。あなたが提案したように、過去の私にとっての解決策は、同期操作で別のスレッドを使用することでした。(ロックと多くの例外ハンドラーを使用してソケットをブロックすることに関する問題のほとんどを回避することができました。)それでも、組み込みの非同期操作を使用することは、真の OS レベルの非同期 I/O を可能にするため、通常は推奨されます。あなたのポイント。

さて、私はあなたが必要だと私が信じていることを達成するためのクラスを書きました(比較的きれいな方法で)。どう考えているか教えてください。

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;

public class AsyncStreamProcessor : IDisposable
{
    protected StringBuilder _buffer;  // Buffer for unprocessed data.

    private bool _isDisposed = false; // True if object has been disposed

    public AsyncStreamProcessor()
    {
        _buffer = null;
    }

    public IEnumerable<string> Process(byte[] newData)
    {
        // Note: replace the following encoding method with whatever you are reading.
        // The trick here is to add an extra line break to the new data so that the algorithm recognises
        // a single line break at the end of the new data.
        using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine))
        {
            // Read all lines from new data, returning all but the last.
            // The last line is guaranteed to be incomplete (or possibly complete except for the line break,
            // which will be processed with the next packet of data).
            string line, prevLine = null;
            while ((line = newDataReader.ReadLine()) != null)
            {
                if (prevLine != null)
                {
                    yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine;
                    _buffer = null;
                }
                prevLine = line;
            }

            // Store last incomplete line in buffer.
            if (_buffer == null)
                // Note: the (* 2) gives you the prediction of the length of the incomplete line, 
                // so that the buffer does not have to be expanded in most/all situations. 
                // Change it to whatever seems appropiate.
                _buffer = new StringBuilder(prevLine, prevLine.Length * 2);
            else
                _buffer.Append(prevLine);
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if (!_isDisposed)
        {
            if (disposing)
            {
                // Dispose managed resources.
                _buffer = null;
                GC.Collect();
            }

            // Dispose native resources.

            // Remember that object has been disposed.
            _isDisposed = true;
        }
    }
}

このクラスのインスタンスは NetworkStream ごとに作成する必要があり、新しいデータを受信するたびに Process 関数を呼び出す必要があります (次の BeginRead を呼び出す前に、BeginRead のコールバック メソッドで)。

注: ネットワーク経由で送信された実際のデータではなく、テスト データを使用してこのコードを検証しただけです。しかし、私は違いを予想していません...

また、クラスはもちろんスレッドセーフではないという警告ですが、BeginRead が現在のデータが処理されるまで再実行されない限り (私はあなたがやっていると思います)、問題はないはずです。

これがうまくいくことを願っています。問題が残っている場合はお知らせください。解決策を修正して対処します。(注意深く読んだにもかかわらず、私が見逃した質問の微妙な点があるかもしれません!)

于 2009-02-08T01:33:22.243 に答える
0

質問で説明していることは、ASCIZ文字列を非常に思い出させます。(リンクテキスト)。それは役に立つスタートかもしれません。

私が取り組んでいたプロジェクトのために、大学でこれに似たものを書かなければなりませんでした。残念ながら、送信ソケットを制御できたので、プロトコルの一部として長さのメッセージ フィールドを挿入しました。ただし、同様のアプローチが役立つと思います。

私の解決策は、5HELLO のようなものを送信することでした。最初に 5 が表示され、メッセージの長さが 5 であることを知り、必要なメッセージは 5 文字でした。ただし、非同期読み取りで 5HE しか得られなかった場合、メッセージの長さは 5 であることがわかりますが、ネットワークから 3 バイトしか読み取ることができませんでした (ASCII 文字を想定してみましょう)。このため、いくつかのバイトが欠落していることを知り、持っていたものをフラグメント バッファに保存しました。ソケットごとに 1 つのフラグメント バッファがあったため、同期の問題を回避できました。大まかな工程は。

  1. ソケットからバイト配列に読み取り、読み取ったバイト数を記録します
  2. 改行文字が見つかるまで、バイト単位でスキャンします (ASCII 文字を受信して​​いない場合、これは非常に複雑になりますが、複数のバイトになる可能性のある文字は、あなた自身です)
  3. you're frag buffer を文字列に変換し、新しい行まで you're read buffer を追加します。この文字列を完了したメッセージとしてキューにドロップするか、独自のデリゲートを処理します。(実際にフラグメントと同じバイト配列に書き込むソケットを読み取ることで、これらのバッファーを最適化できますが、それを説明するのは難しいです)
  4. ループを続け、新しい行が見つかるたびに、記録された開始/終了位置から配列されたバイトから文字列を作成し、処理のためにキュー/デリゲートにドロップします。
  5. 読み取りバッファーの最後に到達したら、残っているものをすべてフラグ バッファーにコピーします。
  6. ソケットで BeginRead を呼び出します。ソケットでデータが使用可能になると、ステップ 1 にジャンプします。

次に、別のスレッドを使用して、incommign メッセージのキューを読み取るか、デリゲートを使用して Threadpool に処理させます。そして、あなたがしなければならないあらゆるデータ処理を行います。私が間違っている場合は誰かが私を修正しますが、これにはスレッド同期の問題はほとんどありません。一度にソケットからの読み取りまたは読み取りの待機しかできないため、ロックについて心配する必要はありません (キューにデータを入力するために、実装でデリゲートを使用しました)。自分で解決する必要がある詳細がいくつかあります。たとえば、残すフラグ バッファの大きさなどです。読み取りを行うときに改行が 0 の場合、メッセージ全体を上書きせずにフラグメント バッファに追加する必要があります。なんでも。最終的には約 700 ~ 800 行のコードを実行したと思いますが、それには接続のセットアップ、暗号化のネゴシエーション、

このセットアップは私にとって非常にうまく機能しました。暗号化処理を含む1.8Ghzオプテロンの実装で、100MbpsのイーサネットLANで最大80Mbpsを実現できました。また、ソケットに関連付けられているため、複数のソケットを同時に処理できるため、サーバーは拡張されます。アイテムを順番に処理する必要がある場合は、キューを使用する必要がありますが、順序が重要でない場合は、デリゲートを使用すると、スレッドプールから非常にスケーラブルなパフォーマンスが得られます。

これが役立つことを願っています。完全な解決策ではなく、探し始める方向性です。

*注意してください。私の実装は純粋にバイトレベルでダウンしており、暗号化をサポートしていました。視覚化を容易にするために例に文字を使用しました。

于 2009-02-08T01:13:51.800 に答える