0

TCP経由でXMLを私にストリーミングするサードパーティシステムがあるとします。送信されたXMLコンテンツの合計(ストリームの1つのメッセージではなく、連結されたメッセージ)は次のようになります。

   <root>
      <insert ....><remark>...</remark></insert>
      <delete ....><remark>...</remark></delete>
      <insert ....><remark>...</remark></insert>
      ....
      <insert ....><remark>...</remark></insert>
   </root>

上記のサンプルのすべての行は、個別に処理可能です。ストリーミングプロセスなので、すべてが到着するまで待つことはできません。コンテンツが到着したときに処理する必要があります。問題は、コンテンツチャンクを任意のポイントでスライスでき、タグが尊重されないことです。このようなフラグメントでコンテンツが到着した場合のコンテンツの処理方法について、良いアドバイスはありますか?

チャンク1:

  <root>
      <insert ....><rem

チャンク2:

                      ark>...</remark></insert>
      <delete ....><remark>...</remark></delete>
      <insert ....><remark>...</rema

チャンクN:

                                    rk></insert>
      ....
      <insert ....><remark>...</remark></insert>
   </root>

編集:

処理速度は問題ではありませんが(リアルタイムのトラブルはありません)、メッセージ全体を待つことはできません。実際には、最後のチャンクが到着することはありません。サードパーティのシステムは、変更が発生するたびにメッセージを送信します。プロセスは決して終了しません、それは決して停止しないストリームです。

4

2 に答える 2

2

この問題についての私の最初の考えは、ストリームからの入力のバッファリングを担当する単純なTextReader派生物を作成することです。このクラスは、XmlReaderにフィードするために使用されます。TextReader派生物は、着信コンテンツをかなり簡単にスキャンして、XMLの完全な「ブロック」(開始ブラケットと終了ブラケット、テキストフラグメント、完全な属性などを含む完全な要素)を探すことができます。また、呼び出し元のコードにフラグを提供して、1つ以上の「ブロック」が使用可能になるタイミングを示し、XmlReaderから次のXMLノードを要求できるようにすることもできます。これにより、TextReader派生物からそのブロックが送信され、バッファーから削除されます。 。

編集:これは簡単で汚い例です。それが完全に機能するかどうかはわかりませんが(テストはしていません)、伝えようとしていたアイデアに出くわします。

public class StreamingXmlTextReader : TextReader
{
    private readonly Queue<string> _blocks = new Queue<string>();
    private string _buffer = String.Empty;
    private string _currentBlock = null;
    private int _currentPosition = 0;

    //Returns if there are blocks available and the XmlReader can go to the next XML node
    public bool AddFromStream(string content)
    {
        //Here is where we would can for simple blocks of XML
        //This simple chunking algorithm just uses a closing angle bracket
        //Not sure if/how well this will work in practice, but you get the idea
        _buffer = _buffer + content;
        int start = 0;
        int end = _buffer.IndexOf('>');
        while(end != -1)
        {
            _blocks.Enqueue(_buffer.Substring(start, end - start));
            start = end + 1;
            end = _buffer.IndexOf('>', start);
        }

        //Store the leftover if there is any
        _buffer = end < _buffer.Length
            ? _buffer.Substring(start, _buffer.Length - start) : String.Empty;

        return BlocksAvailable;
    }

    //Lets the caller know if any blocks are currently available, signaling the XmlReader can ask for another node
    public bool BlocksAvailable { get { return _blocks.Count > 0; } }

    public override int Read()
    {
        if (_currentBlock != null && _currentPosition < _currentBlock.Length - 1)
        {
            //Get the next character in this block
            return _currentBlock[_currentPosition++];
        }
        if(BlocksAvailable)
        {
            _currentBlock = _blocks.Dequeue();
            _currentPosition = 0;
            return _currentBlock[0];
        }
        return -1;
    }
}
于 2011-06-23T13:56:08.793 に答える
0

さらに調査した結果、TCP バッファがいっぱいになるたびに XML ストリームがスライスされていることがわかりました。そのため、実際にはバイト ストリーム内でスライスがランダムに発生し、Unicode 文字内でもカットが発生していました。そのため、パーツをバイトレベルで組み立て、それをテキストに戻す必要がありました。変換が失敗した場合は、次のバイト チャンクを待って、再試行しました。

于 2011-08-29T08:10:42.553 に答える