そこで、クラスター アプリケーションのメッセージ パッシング プロトコルとして 0MQ を検討しています。私が必要とする一種の非同期通信を提供します。
これまでのところ、プロトタイプの作成に成功しています。いくつかの異なるタイプのパターンを使用してメッセージを送受信できます。
現在、 「ハンドシェイク」PUB/SUB
を介して初期同期で動作するパターンを取得しようとしています。REP/REQ
この方法は、ZeroMQ ガイドの「ノード調整」で説明されています。そして、この方法はかなりうまく機能します。唯一の問題は、0MQ がメッセージの約 50%、または少なくともその中のデータを消費しているように見えることです。
私は Qt を使用しているため、メッセージはQString
. QByteArray
を使用して文字列を にバイトパックしていQDataStream
ます。次に、QByteArray
「ワイヤー」の上に通し、反対側で開梱します。私はこの方法を他のプロトコル (tcp ソケットなど) を介して通信するために頻繁に使用してきましたが、うまく機能します。
PUB/SUB
viaとREQ/REP
1 つの「マネージャー」に接続する 4 つの「ワーカー」があります。ほとんどの実行では、少なくとも 1 つ、最大で 3 つの「ワーカー」が正常に同期します。ただし、そうでない場合は、空の文字列を受け取っています。
4 つのクライアントのログは次のとおりです。
alicia
:
[09:18:15.337] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client
alicia
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message:
START
[ 09:18:15.341] [Debug] Recieved a SYNC message
SYNC
[09:18:15.341] [Info] Received START signal from manager
[09:18:15.341] [Info] Received message:
2
brenda
:
[09:18:15.337] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client
brenda
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message:
START
[ 09:18:15.340] [Debug] Recieved a SYNC message
SYNC
[09:18:15.340] [Info] Received START signal from manager
[09:18:15.340] [Info] Received message:
START
[09:18:15.340] [Debug] Sending
0
th message
carlie
:
[09:18:15.336] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client
carlie
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message:
START
[ 09:18:15.340] [Debug] Recieved a SYNC message
[09:18:15.340] [Fatal] carlie
Caught unhandled WASError:
Assertion "SOMEPATH/zmqworker.cpp" failed at SOMEPATH/zmqworker.cpp:52:virtual void was::ZMQWorker::run(): Invalid sync message
darcie
:
[09:18:15.336] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client
darcie
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message:
START
[ 09:18:15.341] [Debug] Recieved a SYNC message
[09:18:15.341] [Fatal] darcie
Caught unhandled WASError:
Assertion "SOMEPATH/zmqworker.cpp" failed at SOMEPATH/zmqworker.cpp:52:virtual void was::ZMQWorker::run(): Invalid sync message
ほとんどのコードは次のとおりです。
からzmqtools.cpp
void sendQString( zmq::socket_t& socket, QString& str )
{
QByteArray package;
QDataStream packer( &package, QIODevice::WriteOnly );
packer << str;
zmq::message_t msg( package.data(), package.size(), NULL );
WAS_ASSERT_MSG( socket.send( msg ), "Failed to send QString" );
}
void recvQString( zmq::socket_t& socket, QString& str )
{
zmq::message_t msg;
WAS_ASSERT_MSG( socket.recv( &msg ), "Failed to receive QString" );
QByteArray package( ( char* )msg.data(), msg.size() );
QDataStream unpacker( &package, QIODevice::ReadOnly );
unpacker >> str;
}
からzmqworker.cpp
ZMQWorker::ZMQWorker( const QString& clientName, QObject *parent ) :
QThread( parent ),
clientName( clientName ),
context( 1 ),
inMessageSocket( context, ZMQ_PULL ),
outMessageSocket( context, ZMQ_PUSH ),
inControlSocket( context, ZMQ_SUB ),
synchronizeSocket( context, ZMQ_REQ )
{
qxtLog->debug() << "Initializing client " << clientName;
inMessageSocket.connect( "tcp://localhost:9900" );
outMessageSocket.connect( "tcp://localhost:9901" );
inControlSocket.connect( "tcp://localhost:9902" );
inControlSocket.setsockopt( ZMQ_SUBSCRIBE, "", 0 );
synchronizeSocket.connect( "tcp://localhost:9903" );
messagePoll.fd = 0;
messagePoll.events = ZMQ_POLLIN;
messagePoll.revents = 0;
messagePoll.socket = inMessageSocket;
controlPoll.fd = 0;
controlPoll.events = ZMQ_POLLIN;
controlPoll.revents = 0;
controlPoll.socket = inControlSocket;
}
void ZMQWorker::run()
{
QString message;
// Wait for start signal from server
bool started = true;
do
{
qxtLog->debug() << "Attempting to receive START signal from server";
recvQString( inControlSocket, message );
qxtLog->info() << "Received message: " << message;
started = message == "START";
} while( !started );
message = "SYNC";
sendQString( synchronizeSocket, message );
recvQString( synchronizeSocket, message );
qxtLog->debug() << "Recieved a SYNC message" << message;
WAS_ASSERT_MSG( message == "SYNC", "Invalid sync message" );
qxtLog->info() << "Received START signal from manager";
int messagesSent = 0;
forever
{
zmq::poll( &messagePoll, 1, 0 );
if( messagePoll.revents & ZMQ_POLLIN )
{
recvQString( inMessageSocket, message );
qxtLog->info() << "Received message: " << message;
}
zmq::poll( &controlPoll, 1, 0 );
if( controlPoll.revents & ZMQ_POLLIN )
{
recvQString( inControlSocket, message );
qxtLog->info() << "Received message: " << message;
if( message == "STOP" )
break;
}
if( messagesSent < 1000 )
{
qxtLog->debug() << "Sending " << messagesSent << "th message";
QString message = QString::number( messagesSent );
sendQString( outMessageSocket, message );
messagesSent++;
}
}
}
からzmqmanager.cpp
ZMQManager::ZMQManager( const QString& serverName, unsigned clientCount, QObject* parent ) :
QThread( parent ),
serverName( serverName ),
clientCount( clientCount ),
context( 1 ),
inMessageSocket( context, ZMQ_PULL ),
outMessageSocket( context, ZMQ_PUSH ),
outControlSocket( context, ZMQ_PUB ),
synchronizeSocket( context, ZMQ_REP )
{
qxtLog->debug() << "Initializing server " << serverName;
outMessageSocket.bind( "tcp://*:9900" );
inMessageSocket.bind( "tcp://*:9901" );
outControlSocket.bind( "tcp://*:9902" );
synchronizeSocket.bind( "tcp://*:9903" );
messagePoll.fd = 0;
messagePoll.events = ZMQ_POLLIN;
messagePoll.revents = 0;
messagePoll.socket = inMessageSocket;
synchronizePoll.fd = 0;
synchronizePoll.events = ZMQ_POLLIN;
synchronizePoll.revents = 0;
synchronizePoll.socket = synchronizeSocket;
}
void ZMQManager::run()
{
QString message;
unsigned clientsConnected = 0;
do
{
qxtLog->debug() << "Publishing START signal";
message = "START";
sendQString( outControlSocket, message );
zmq::poll( &synchronizePoll, 1, 5000 );
if( synchronizePoll.revents & ZMQ_POLLIN )
{
qxtLog->debug() << "Checking for response to START signal";
recvQString( synchronizeSocket, message );
qxtLog->debug() << "Recieved a SYNC message" << message;
WAS_ASSERT_MSG( message == "SYNC", "Invalid sync message" );
sendQString( synchronizeSocket, message );
clientsConnected++;
}
} while( clientsConnected < clientCount );
qxtLog->info() << "Started and syncrhonized with clients";
unsigned messagesSent = 0;
unsigned messagesReceived = 0;
do
{
zmq::poll( &messagePoll, 1, 0 );
if( messagePoll.revents & ZMQ_POLLIN )
{
recvQString( inMessageSocket, message );
qxtLog->info() << "Received message: " << message;
messagesReceived++;
}
if( messagesSent < clientCount * 1000 )
{
qxtLog->debug() << "Sending a message";
message = QString::number( messagesSent );
sendQString( outMessageSocket, message );
messagesSent++;
}
} while( messagesSent < clientCount * 1000 && messagesReceived < clientCount * 1000 );
message = "STOP";
sendQString( outControlSocket, message );
}
だから、ついに私の質問:
- このコードで何かを見落としていますか?
- ZeroMQ メッセージのパック/アンパックが間違っていますか?
- 同期の管理が
REQ/REP
間違っていませんか?
ZMQ、特に混合パターンとパターンの経験がある人からの洞察をいただければPUB/SUB
幸いPUSH/PULL
ですREQ/REP
。