問題
同じマシン上の 2 つのプロセスが TCP/IP を使用して通信するプロジェクトにboost::asioを使用しています。一方は他方が読み取るデータを生成しますが、断続的に接続を介してデータが送信されないという問題が発生しています。async tcp echo server exampleに基づいて、これを以下の非常に単純な例に要約しました。
プロセス (以下のソース コード) は問題なく開始され、送信者から受信者に高速でデータが配信されます。その後、突然、約 5 秒間、データがまったく配信されなくなります。その後、次の不可解な一時停止まで、データが再び配信されます。この 5 秒間、プロセスは 0% の CPU を消費し、他のプロセスは特に何もしていないようです。一時停止は常に同じ長さ (5 秒) です。
私はこれらの失速を取り除く方法とそれらの原因を理解しようとしています.
実行全体の CPU 使用率:

実行の途中で CPU 使用率が 3 回低下していることに注意してください。「実行」とは、サーバー プロセスとクライアント プロセスの 1 回の呼び出しです。これらの急落の間、データは配信されませんでした。ディップの数とそのタイミングは実行ごとに異なります。まったくディップがない場合もあれば、ディップが多い場合もあります。
読み取りバッファーのサイズを変更することで、これらのストールの「可能性」に影響を与えることができます。たとえば、読み取りバッファーを送信チャンク サイズの倍数にすると、この問題はほとんど解消されますが、完全ではないように見えます。
ソースとテストの説明
Boost 1.43 と Boost 1.45 を使用して、Visual Studio 2005 で以下のコードをコンパイルしました。Windows Vista 64 ビット (クアッドコア) と Windows 7 64 ビット (クアッドコアとデュアルコアの両方) でテストしました。
サーバーは接続を受け入れ、データの読み取りと破棄を行うだけです。読み取りが実行されるたびに、新しい読み取りが発行されます。
クライアントはサーバーに接続し、一連のパケットを送信キューに入れます。この後、一度に 1 つずつパケットを書き込みます。書き込みが完了するたびに、キュー内の次のパケットが書き込まれます。別のスレッドがキューのサイズを監視し、これを毎秒 stdout に出力します。io ストールの間、キューのサイズはまったく同じままです。
scatter io (1 つのシステム コールで複数のパケットを書き込む) を使用しようとしましたが、結果は同じです。を使用して Boost で IO 完了ポートを無効にするBOOST_ASIO_DISABLE_IOCPと、問題は解決したように見えますが、スループットが大幅に低下します。
// Example is adapted from async_tcp_echo_server.cpp which is
// Copyright (c) 2003-2010 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Start program with -s to start as the server
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0501
#endif
#include <iostream>
#include <tchar.h>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#define PORT "1234"
using namespace boost::asio::ip;
using namespace boost::system;
class session {
public:
session(boost::asio::io_service& io_service) : socket_(io_service) {}
void do_read() {
socket_.async_read_some(boost::asio::buffer(data_, max_length),
boost::bind(&session::handle_read, this, _1, _2));
}
boost::asio::ip::tcp::socket& socket() { return socket_; }
protected:
void handle_read(const error_code& ec, size_t bytes_transferred) {
if (!ec) {
do_read();
} else {
delete this;
}
}
private:
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
};
class server {
public:
explicit server(boost::asio::io_service& io_service)
: io_service_(io_service)
, acceptor_(io_service, tcp::endpoint(tcp::v4(), atoi(PORT)))
{
session* new_session = new session(io_service_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session, _1));
}
void handle_accept(session* new_session, const error_code& ec) {
if (!ec) {
new_session->do_read();
new_session = new session(io_service_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session, _1));
} else {
delete new_session;
}
}
private:
boost::asio::io_service& io_service_;
boost::asio::ip::tcp::acceptor acceptor_;
};
class client {
public:
explicit client(boost::asio::io_service &io_service)
: io_service_(io_service)
, socket_(io_service)
, work_(new boost::asio::io_service::work(io_service))
{
io_service_.post(boost::bind(&client::do_init, this));
}
~client() {
packet_thread_.join();
}
protected:
void do_init() {
// Connect to the server
tcp::resolver resolver(io_service_);
tcp::resolver::query query(tcp::v4(), "localhost", PORT);
tcp::resolver::iterator iterator = resolver.resolve(query);
socket_.connect(*iterator);
// Start packet generation thread
packet_thread_.swap(boost::thread(
boost::bind(&client::generate_packets, this, 8000, 5000000)));
}
typedef std::vector<unsigned char> packet_type;
typedef boost::shared_ptr<packet_type> packet_ptr;
void generate_packets(long packet_size, long num_packets) {
// Add a single dummy packet multiple times, then start writing
packet_ptr buf(new packet_type(packet_size, 0));
write_queue_.insert(write_queue_.end(), num_packets, buf);
queue_size = num_packets;
do_write_nolock();
// Wait until all packets are sent.
while (long queued = InterlockedExchangeAdd(&queue_size, 0)) {
std::cout << "Queue size: " << queued << std::endl;
Sleep(1000);
}
// Exit from run(), ignoring socket shutdown
work_.reset();
}
void do_write_nolock() {
const packet_ptr &p = write_queue_.front();
async_write(socket_, boost::asio::buffer(&(*p)[0], p->size()),
boost::bind(&client::on_write, this, _1));
}
void on_write(const error_code &ec) {
if (ec) { throw system_error(ec); }
write_queue_.pop_front();
if (InterlockedDecrement(&queue_size)) {
do_write_nolock();
}
}
private:
boost::asio::io_service &io_service_;
tcp::socket socket_;
boost::shared_ptr<boost::asio::io_service::work> work_;
long queue_size;
std::list<packet_ptr> write_queue_;
boost::thread packet_thread_;
};
int _tmain(int argc, _TCHAR* argv[]) {
try {
boost::asio::io_service io_svc;
bool is_server = argc > 1 && 0 == _tcsicmp(argv[1], _T("-s"));
std::auto_ptr<server> s(is_server ? new server(io_svc) : 0);
std::auto_ptr<client> c(is_server ? 0 : new client(io_svc));
io_svc.run();
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
したがって、私の質問は基本的に次のとおりです。
これらの屋台を取り除くにはどうすればよいですか?
これが起こる原因は何ですか?
更新:上記とは逆に、ディスク アクティビティとの相関関係があるようです。そのため、テストの実行中にディスク上で大きなディレクトリ コピーを開始すると、IO ストールの頻度が高くなる可能性があります。これは、これがWindows IO の優先順位付けであることを示している可能性がありますか? 一時停止は常に同じ長さなので、OS io コードのどこかでタイムアウトのように聞こえます...