2

ネットワークからデータを読み取り、それを関数に送信して内容を解析しようとしています。メッセージ (xml) のサイズが異なる可能性があるため、メッセージ全体、複数のメッセージ、またはメッセージの一部を読み取ることができました。

BlockingCollectionネットワークからデータを読み取るときに呼び出す場所を使用してコードを実装しTryAdd、コンシューマ スレッドを使用してデータを取り出してBlockingCollection解析しようとしています。例は非常に簡単に見えますが、一度しか機能しないようで、その後終了します。メッセージが入ってくると、消費者が継続的に解析するようにします。私が現在行っていることについては、以下のコードを参照してください。

メッセージの処理:

    private static BlockingCollection<byte[]> queue = new BlockingCollection<byte[]>();

    public XmlHandler()
    {
        CancellationTokenSource cts = new CancellationTokenSource();
        Task.Factory.StartNew(() =>
      {
          if (Console.ReadKey().KeyChar == 'c')
              cts.Cancel();
      });


        Task.Factory.StartNew(() => ParserWorker(queue, cts.Token));

    }
    //run producer
    public void AddData(byte[] data, int bytesRead)
    {
        bool success = false;
        try
        {
            success = queue.TryAdd(data);
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Add loop canceled.");
            queue.CompleteAdding();
        }

        if (success)
        {
            Console.WriteLine(" Add:{0}", data);
        }
        else
        {
            Console.Write(" AddBlocked");
        }

        System.Console.WriteLine("queue count = " + queue.Count);
    }

    private static void ParserWorker(BlockingCollection<byte[]> bc, CancellationToken ct)
    {
        ASCIIEncoding encoder = new ASCIIEncoding();
        String xmlString;

        while (!bc.IsCompleted)
        {
            byte[] nextItem;
            try
            {

                if (!bc.TryTake(out nextItem, 0, ct))
                {
                    Console.WriteLine(" Take Blocked");
                }
                else
                {
                    xmlString = encoder.GetString(nextItem, 0, nextItem.Length);
                    System.Console.WriteLine(xmlString);
                }

            }

            catch (OperationCanceledException)
            {
                Console.WriteLine("Taking canceled.");
                break;
            }
        }       
    }

ワイヤーを読み取る(これはスレッドで実行されます):

private void HandleClientComm(object client)
{
      TcpClient tcpClient = (TcpClient)client;
      NetworkStream clientStream = tcpClient.GetStream();

      byte[] message = new byte[8192];
      int bytesRead;

      while (true)
      {
          bytesRead = 0;

          try
          {

              bytesRead = clientStream.Read(message, 0, 4096);
              byte[] temp = new byte[bytesRead];
              Array.Copy(message, temp, bytesRead);

              /*CODE WILL HANG HERE...*/
              ASCIIEncoding encoder = new ASCIIEncoding();
              String xmlString = encoder.GetString(message, 0, message.Length);
              System.Console.WriteLine(xmlString);
              /*DOES NOT GO BEYOND LINE ABOVE */

              handler.AddData(message, bytesRead);  //xmlhandler
           }
           catch (Exception e)
           {
               System.Console.WriteLine(e.ToString());
               break;
           }

           if (bytesRead == 0)
           {
               break;
           }
      }
}

ここで私が間違っていることを誰かに教えてもらえますか?

4

0 に答える 0