4

Windows 用のシリアル ポート ソフトウェアを作成しています。パフォーマンスを向上させるために、非同期 I/O を使用するようにルーチンを変換しようとしています。私はコードを作成し、かなりうまく機能していますが、これはまだ初心者であり、プログラムのパフォーマンスをさらに改善したいと考えています。プログラムのストレス テスト中 (つまり、高いボーレートで可能な限り高速にポートとの間でデータをバーストする)、CPU 負荷が非常に高くなります。

Windows で非同期 I/O とマルチスレッドを使用した経験のある方がいらっしゃいましたら、私のプログラムをご覧いただければ幸いです。主な懸念事項が 2 つあります。

  • 非同期 I/O は正しく実装されていますか? 最後に独自のデータを使用して独自の OVERLAPPED 構造体を実装することにより、ユーザーデータをコールバック関数に渡すことができることを示唆する、かなり信頼できるソースをネット上で見つけました。これはうまく機能しているように見えますが、私には少し「ハック」に見えます。また、同期/ポーリングから非同期/コールバックに変換したとき、プログラムのパフォーマンスはそれほど改善されず、何か間違ったことをしているのではないかと疑われました。

  • FIFO データ バッファに STL std::deque を使用するのは正気ですか? プログラムは現在作成されているため、一度に 1 バイトのデータしか受信できないため、処理する必要はありません。受信するデータの量がわからないため、無限の量になる可能性があります。この一度に 1 バイトずつでは、データを割り当てる必要があるときに deque の行の背後で動作が遅くなると思います。また、deque がスレッドセーフであるとは信じていません (そうすべきでしょうか?)。STL deque の使用が適切でない場合、より適切なデータ型を使用するための提案はありますか? 静的配列ベースの循環リング バッファ?

コードに関するその他のフィードバックも大歓迎です。


シリアル ルーチンは、シリアル I/O 関連のすべてを処理する「Comport」という親クラスを持つように実装されています。このクラスから、マルチスレッド バージョンである「ThreadedComport」という別のクラスを継承します。

ThreadedComport クラス (関連部分)

class ThreadedComport : public Comport
{
  private:

    HANDLE        _hthread_port;                 /* thread handle      */
    HANDLE        _hmutex_port;                  /* COM port access    */
    HANDLE        _hmutex_send;                  /* send buffer access */
    HANDLE        _hmutex_rec;                   /* rec buffer access  */

    deque<uint8>  _send_buf;
    deque<uint8>  _rec_buf;
    uint16        _data_sent;
    uint16        _data_received;

    HANDLE        _hevent_kill_thread;
    HANDLE        _hevent_open;
    HANDLE        _hevent_close;
    HANDLE        _hevent_write_done;
    HANDLE        _hevent_read_done;
    HANDLE        _hevent_ext_send;              /* notifies external thread */
    HANDLE        _hevent_ext_receive;           /* notifies external thread */

    typedef struct
    {
      OVERLAPPED       overlapped;
      ThreadedComport* caller;                  /* add user data to struct */
    } OVERLAPPED_overlap;

    OVERLAPPED_overlap _send_overlapped;
    OVERLAPPED_overlap _rec_overlapped;
    uint8*             _write_data;
    uint8              _read_data;
    DWORD              _bytes_read;

    static DWORD WINAPI _tranceiver_thread (LPVOID param);
    void                _send_data         (void);
    void                _receive_data      (void);
    DWORD               _wait_for_io       (void);

    static void WINAPI  _send_callback     (DWORD dwErrorCode,
                                            DWORD dwNumberOfBytesTransfered,
                                            LPOVERLAPPED lpOverlapped);
    static void WINAPI  _receive_callback  (DWORD dwErrorCode,
                                            DWORD dwNumberOfBytesTransfered,
                                            LPOVERLAPPED lpOverlapped);

};

CreateThread() によって作成されたメイン スレッド ルーチン:

DWORD WINAPI ThreadedComport::_tranceiver_thread (LPVOID param)
{
  ThreadedComport* caller = (ThreadedComport*) param;

  HANDLE handle_array [3] =
  {
    caller->_hevent_kill_thread,                 /* WAIT_OBJECT_0 */
    caller->_hevent_open,                        /* WAIT_OBJECT_1 */
    caller->_hevent_close                        /* WAIT_OBJECT_2 */
  };

  DWORD result;

  do
  {
    /* wait for anything to happen */
    result = WaitForMultipleObjects(3,
                                    handle_array,
                                    false,       /* dont wait for all */
                                    INFINITE);

    if(result == WAIT_OBJECT_1 )                 /* open? */
    {
      do                                         /* while port is open, work */
      {
        caller->_send_data();
        caller->_receive_data();
        result = caller->_wait_for_io();         /* will wait for the same 3 as in handle_array above,
                                                    plus all read/write specific events */

      } while (result != WAIT_OBJECT_0 &&        /* while not kill thread */
               result != WAIT_OBJECT_2);         /* while not close port */
    }
    else if(result == WAIT_OBJECT_2)             /* close? */
    {
      ;                                          /* do nothing */
    }

  } while (result != WAIT_OBJECT_0);             /* kill thread? */

  return 0;
}

次に、次の 3 つの関数を呼び出します。

void ThreadedComport::_send_data (void)
{
  uint32 send_buf_size;

  if(_send_buf.size() != 0)                      // anything to send?
  {
    WaitForSingleObject(_hmutex_port, INFINITE);
      if(_is_open)                               // double-check port
      {
        bool result;

        WaitForSingleObject(_hmutex_send, INFINITE);
          _data_sent = 0;
          send_buf_size = _send_buf.size();
          if(send_buf_size > (uint32)_MAX_MESSAGE_LENGTH)
          {
            send_buf_size = _MAX_MESSAGE_LENGTH;
          }
          _write_data = new uint8 [send_buf_size];


          for(uint32 i=0; i<send_buf_size; i++)
          {
            _write_data[i] = _send_buf.front();
            _send_buf.pop_front();
          }
          _send_buf.clear();
        ReleaseMutex(_hmutex_send);


        result = WriteFileEx (_hcom,              // handle to output file
                              (void*)_write_data, // pointer to input buffer
                              send_buf_size,      // number of bytes to write
                              (LPOVERLAPPED)&_send_overlapped, // pointer to async. i/o data
                              (LPOVERLAPPED_COMPLETION_ROUTINE )&_send_callback);

        SleepEx(INFINITE, true);                 // Allow callback to come

        if(result == false)
        {
          // error handling here
        }

      } // if(_is_open)
    ReleaseMutex(_hmutex_port);
  }
  else /* nothing to send */
  {
    SetEvent(_hevent_write_done);                // Skip write
  }
}


void ThreadedComport::_receive_data (void)
{
  WaitForSingleObject(_hmutex_port, INFINITE);

    if(_is_open)
    {
      BOOL  result;

      _bytes_read = 0;
      result = ReadFileEx (_hcom,                  // handle to output file
                           (void*)&_read_data,     // pointer to input buffer
                           1,                      // number of bytes to read
                           (OVERLAPPED*)&_rec_overlapped, // pointer to async. i/o data
                           (LPOVERLAPPED_COMPLETION_ROUTINE )&_receive_callback);

      SleepEx(INFINITE, true);                     // Allow callback to come

      if(result == FALSE)
      {
        DWORD last_error = GetLastError();
        if(last_error == ERROR_OPERATION_ABORTED)  // disconnected ?
        {
          close();                                 // close the port
        }
      }
    }

  ReleaseMutex(_hmutex_port);
}



DWORD ThreadedComport::_wait_for_io (void)
{
  DWORD result;
  bool  is_write_done = false;
  bool  is_read_done  = false;

  HANDLE handle_array [5] =
  {
    _hevent_kill_thread,
    _hevent_open,
    _hevent_close,
    _hevent_write_done,
    _hevent_read_done
  };


  do /* COM port message pump running until sending / receiving is done */
  {
    result = WaitForMultipleObjects(5,
                        handle_array,
                        false,                     /* dont wait for all */
                        INFINITE);

    if(result <= WAIT_OBJECT_2)
    {
      break;                                       /* abort */
    }
    else if(result == WAIT_OBJECT_3)               /* write done */
    {
      is_write_done = true;
      SetEvent(_hevent_ext_send);
    }
    else if(result == WAIT_OBJECT_4)               /* read done */
    {
      is_read_done = true;

      if(_bytes_read > 0)
      {
        uint32 errors = 0;

        WaitForSingleObject(_hmutex_rec, INFINITE);
          _rec_buf.push_back((uint8)_read_data);
          _data_received += _bytes_read;

          while((uint16)_rec_buf.size() > _MAX_MESSAGE_LENGTH)
          {
            _rec_buf.pop_front();
          }

        ReleaseMutex(_hmutex_rec);
        _bytes_read = 0;

        ClearCommError(_hcom, &errors, NULL);
        SetEvent(_hevent_ext_receive);
      }
    }
  } while(!is_write_done || !is_read_done);

  return result;
}

非同期 I/O コールバック関数:

void WINAPI ThreadedComport::_send_callback (DWORD dwErrorCode,
                                             DWORD dwNumberOfBytesTransfered,
                                             LPOVERLAPPED lpOverlapped)
{
  ThreadedComport* _this = ((OVERLAPPED_overlap*)lpOverlapped)->caller;

  if(dwErrorCode == 0)                           // no errors
  {
    if(dwNumberOfBytesTransfered > 0)
    {
      _this->_data_sent = dwNumberOfBytesTransfered;
    }
  }


  delete [] _this->_write_data;                  /* always clean this up */
  SetEvent(lpOverlapped->hEvent);
}


void WINAPI ThreadedComport::_receive_callback (DWORD dwErrorCode,
                                                DWORD dwNumberOfBytesTransfered,
                                                LPOVERLAPPED lpOverlapped)
{
  if(dwErrorCode == 0)                           // no errors
  {
    if(dwNumberOfBytesTransfered > 0)
    {
      ThreadedComport* _this = ((OVERLAPPED_overlap*)lpOverlapped)->caller;
      _this->_bytes_read = dwNumberOfBytesTransfered;
    }
  }

  SetEvent(lpOverlapped->hEvent);
}
4

3 に答える 3

1

あなたのコードは最適ではない設計になっていると思います。

  • あまりにも多くのデータ構造を共有しているスレッドが多すぎると思います。1 つのポートのシリアル デバイス IO のすべての処理を 1 つのスレッドに配置し、同期されたコマンド/データ キューを IO スレッドとすべてのクライアント スレッドの間に配置する必要があると思います。IO スレッドがキュー内のコマンド/データを監視するようにします。

  • 送信されたイベントごとにいくつかのバッファーを割り当てて解放しているようです。それを避けてください。すべての IO を 1 つのスレッドに保持すると、1 つのバッファーを再利用できます。とにかくメッセージのサイズを制限しています.1つの十分な大きさのバッファを事前に割り当てることができます.

  • 送信したいバイトを に入れるのstd::dequeは最適ではありません。の連続メモリ ブロックにそれらをシリアル化する必要がありWriteFile()ます。代わりに、1 つの IO スレッドと他のスレッドの間である種のコマンド/データ キューを使用すると、クライアント スレッドにメモリの連続チャンクを一度に提供させることができます。

  • 一度に 1 バイトずつ読み取るのもばかげているようです。シリアル デバイスで機能しない場合を除き、十分な大きさのバッファを に提供できますReadFileEx()。実際に読み取ることができたバイト数を返します。もちろん、私が間違っていない限り、ブロックするべきではありません。

  • オーバーラップした IO がSleepEx()呼び出しを使用して終了するのを待っています。オーバーラップした IO のポイントは何ですか?

于 2011-03-07T15:46:14.580 に答える