3

そこで、クラスター アプリケーションのメッセージ パッシング プロトコルとして 0MQ を検討しています。私が必要とする一種の非同期通信を提供します。

これまでのところ、プロトタイプの作成に成功しています。いくつかの異なるタイプのパターンを使用してメッセージを送受信できます。

現在、 「ハンドシェイク」PUB/SUBを介して初期同期で動作するパターンを取得しようとしています。REP/REQこの方法は、ZeroMQ ガイドの「ノード調整」で説明されています。そして、この方法はかなりうまく機能します。唯一の問題は、0MQ がメッセージの約 50%、または少なくともその中のデータを消費しているように見えることです。

私は Qt を使用しているため、メッセージはQString. QByteArrayを使用して文字列を にバイトパックしていQDataStreamます。次に、QByteArray「ワイヤー」の上に通し、反対側で開梱します。私はこの方法を他のプロトコル (tcp ソケットなど) を介して通信するために頻繁に使用してきましたが、うまく機能します。

PUB/SUBviaとREQ/REP1 つの「マネージャー」に接続する 4 つの「ワーカー」があります。ほとんどの実行では、少なくとも 1 つ、最大で 3 つの「ワーカー」が正常に同期します。ただし、そうでない場合は、空の文字列を受け取っています。

4 つのクライアントのログは次のとおりです。

alicia:

[09:18:15.337] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client 
                       alicia
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message: 
                      START
[    09:18:15.341] [Debug] Recieved a SYNC message
                       SYNC
[09:18:15.341] [Info] Received START signal from manager
[09:18:15.341] [Info] Received message: 
                      2

brenda:

[09:18:15.337] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client 
                       brenda
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message: 
                      START
[    09:18:15.340] [Debug] Recieved a SYNC message
                       SYNC
[09:18:15.340] [Info] Received START signal from manager
[09:18:15.340] [Info] Received message: 
                      START
[09:18:15.340] [Debug] Sending 
                       0
                       th message

carlie:

[09:18:15.336] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client 
                       carlie
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message: 
                      START
[    09:18:15.340] [Debug] Recieved a SYNC message
[09:18:15.340] [Fatal] carlie
                        Caught unhandled WASError:
                       Assertion "SOMEPATH/zmqworker.cpp" failed at SOMEPATH/zmqworker.cpp:52:virtual void was::ZMQWorker::run(): Invalid sync message

darcie:

[09:18:15.336] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client 
                       darcie
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message: 
                      START
[    09:18:15.341] [Debug] Recieved a SYNC message
[09:18:15.341] [Fatal] darcie
                        Caught unhandled WASError:
                       Assertion "SOMEPATH/zmqworker.cpp" failed at SOMEPATH/zmqworker.cpp:52:virtual void was::ZMQWorker::run(): Invalid sync message

ほとんどのコードは次のとおりです。

からzmqtools.cpp

void sendQString( zmq::socket_t& socket, QString& str )
{
    QByteArray package;
    QDataStream packer( &package, QIODevice::WriteOnly );
    packer << str;
    zmq::message_t msg( package.data(), package.size(), NULL );
    WAS_ASSERT_MSG( socket.send( msg ), "Failed to send QString" );
}


void recvQString( zmq::socket_t& socket, QString& str )
{
    zmq::message_t msg;
    WAS_ASSERT_MSG( socket.recv( &msg ), "Failed to receive QString" );
    QByteArray package( ( char* )msg.data(), msg.size() );
    QDataStream unpacker( &package, QIODevice::ReadOnly );
    unpacker >> str;
}

からzmqworker.cpp

    ZMQWorker::ZMQWorker( const QString& clientName, QObject *parent ) :
    QThread( parent ),
    clientName( clientName ),
    context( 1 ),
    inMessageSocket( context, ZMQ_PULL ),
    outMessageSocket( context, ZMQ_PUSH ),
        inControlSocket( context, ZMQ_SUB ),
    synchronizeSocket( context, ZMQ_REQ )
{
    qxtLog->debug() << "Initializing client " << clientName;

    inMessageSocket.connect( "tcp://localhost:9900" );
    outMessageSocket.connect( "tcp://localhost:9901" );
    inControlSocket.connect( "tcp://localhost:9902" );
    inControlSocket.setsockopt( ZMQ_SUBSCRIBE, "", 0 );
    synchronizeSocket.connect( "tcp://localhost:9903" );

    messagePoll.fd = 0;
    messagePoll.events = ZMQ_POLLIN;
    messagePoll.revents = 0;
    messagePoll.socket = inMessageSocket;

    controlPoll.fd = 0;
    controlPoll.events = ZMQ_POLLIN;
    controlPoll.revents = 0;
    controlPoll.socket = inControlSocket;
}

void ZMQWorker::run()
{
    QString message;
    // Wait for start signal from server
    bool started = true;
    do
    {
        qxtLog->debug() << "Attempting to receive START signal from server";
        recvQString( inControlSocket, message );
        qxtLog->info() << "Received message: " << message;
        started = message == "START";
    } while( !started );

    message = "SYNC";
    sendQString( synchronizeSocket, message );
    recvQString( synchronizeSocket, message );
    qxtLog->debug() << "Recieved a SYNC message" << message;
    WAS_ASSERT_MSG( message == "SYNC", "Invalid sync message" );

    qxtLog->info() << "Received START signal from manager";

    int messagesSent = 0;
    forever
    {

        zmq::poll( &messagePoll, 1, 0 );
        if( messagePoll.revents & ZMQ_POLLIN )
        {
            recvQString( inMessageSocket, message );
            qxtLog->info() << "Received message: " << message;
        }

        zmq::poll( &controlPoll, 1, 0 );
        if( controlPoll.revents & ZMQ_POLLIN )
        {
            recvQString( inControlSocket, message );
            qxtLog->info() << "Received message: " << message;
            if( message == "STOP" )
                break;
        }


        if( messagesSent < 1000 )
        {
            qxtLog->debug() << "Sending " << messagesSent << "th message";
            QString message = QString::number( messagesSent );
            sendQString( outMessageSocket, message );
            messagesSent++;
        }
    }
}

からzmqmanager.cpp

ZMQManager::ZMQManager( const QString& serverName, unsigned clientCount, QObject* parent ) :
    QThread( parent ),
    serverName( serverName ),
    clientCount( clientCount ),
    context( 1 ),
    inMessageSocket( context, ZMQ_PULL ),
    outMessageSocket( context, ZMQ_PUSH ),
    outControlSocket( context, ZMQ_PUB ),
    synchronizeSocket( context, ZMQ_REP )
{
    qxtLog->debug() << "Initializing server " << serverName;


    outMessageSocket.bind( "tcp://*:9900" );
    inMessageSocket.bind( "tcp://*:9901" );
    outControlSocket.bind( "tcp://*:9902" );
    synchronizeSocket.bind( "tcp://*:9903" );


    messagePoll.fd = 0;
    messagePoll.events = ZMQ_POLLIN;
    messagePoll.revents = 0;
    messagePoll.socket = inMessageSocket;

    synchronizePoll.fd = 0;
    synchronizePoll.events = ZMQ_POLLIN;
    synchronizePoll.revents = 0;
    synchronizePoll.socket = synchronizeSocket;
}

void ZMQManager::run()
{
    QString message;
    unsigned clientsConnected = 0;
    do
    {
        qxtLog->debug() << "Publishing START signal";
        message = "START";
        sendQString( outControlSocket, message );

        zmq::poll( &synchronizePoll, 1, 5000 );
        if( synchronizePoll.revents & ZMQ_POLLIN )
        {
            qxtLog->debug() << "Checking for response to START signal";
            recvQString( synchronizeSocket, message );
            qxtLog->debug() << "Recieved a SYNC message" << message;
            WAS_ASSERT_MSG( message == "SYNC", "Invalid sync message" );
            sendQString( synchronizeSocket, message );
            clientsConnected++;
        }

    } while( clientsConnected < clientCount );

    qxtLog->info() << "Started and syncrhonized with clients";

    unsigned messagesSent = 0;
    unsigned messagesReceived = 0;
    do
    {
            zmq::poll( &messagePoll, 1, 0 );
        if( messagePoll.revents & ZMQ_POLLIN )
        {
            recvQString( inMessageSocket, message );
            qxtLog->info() << "Received message: " << message;
            messagesReceived++;
        }

        if( messagesSent < clientCount * 1000 )
        {
            qxtLog->debug() << "Sending a message";
            message = QString::number( messagesSent );
            sendQString( outMessageSocket, message );
            messagesSent++;
        }

    } while( messagesSent < clientCount * 1000 && messagesReceived < clientCount * 1000 );

    message = "STOP";
    sendQString( outControlSocket, message );

}

だから、ついに私の質問:

  • このコードで何かを見落としていますか?
  • ZeroMQ メッセージのパック/アンパックが間違っていますか?
  • 同期の管理がREQ/REP間違っていませんか?

ZMQ、特に混合パターンとパターンの経験がある人からの洞察をいただければPUB/SUB幸いPUSH/PULLですREQ/REP

4

1 に答える 1

8

問題は、コンストラクターを使用すると、ZeroMQ がデータをコピーしないことでした。

zmq::message_t( void*, int size );

代わりに、ZeroMQ は、提供されたバッファーをメッセージのストレージとして使用するだけです。そのため、メッセージが実際に送信される前にそのバッファが解放または上書きされると、ガベージまたはセグ フォールトが発生します。

解決策は比較的簡単です。代わりに、次のコンストラクターを使用します。

zmq::message_t( int size );

指定されたサイズのバッファを作成します。次に、送信する前に、手動でデータをメッセージに手動でコピーするだけです。

修正され、現在機能している送信機能は次のとおりです。

void sendQString( zmq::socket_t& socket, QString& str )
{
    QByteArray package;
    QDataStream packer( &package, QIODevice::WriteOnly );
    packer << str;
    qxtLog->debug() << "sending a message of " << package.size() << " bytes";
    qxtLog->debug() << "the message says " << str;
    zmq::message_t msg( package.size() );
    memcpy( msg.data(), package.data(), package.size() );
    ASSERT_MSG( socket.send( msg ), "Failed to send QString" );
}

void recvQString( zmq::socket_t& socket, QString& str )
{    
    zmq::message_t msg;
    ASSERT_MSG( socket.recv( &msg ), "Failed to receive QString" );
    qxtLog->debug() << "received a message of " << (int)msg.size() << " bytes";
    QByteArray package( ( char* )msg.data(), msg.size() );
    QDataStream unpacker( &package, QIODevice::ReadOnly );
    unpacker >> str;
    qxtLog->debug() << "the message says " << str;
}

freenode の #zeromq チャンネルの人たちのおかげで、ようやく問題を特定することができました。

于 2011-08-20T00:31:12.733 に答える