0

ZeroMQを介してC構造体を送信することになっているプログラムを書いています。したがって、私はGoogleのProtocolBuffersを使用して構造体をシリアル化しています。

サブスクライバー側が何も受信しないという問題が発生しました。パブリッシャーは「メッセージが正常に送信されました」と出力するので、サブスクライバー側でエラーが発生したと思います。

出版社:

int main (void)
{
    Message protomsg = MESSAGE__INIT;
        void *buf;
    unsigned len;

        void *context = zmq_ctx_new();
        void *subscriber = zmq_socket(context, ZMQ_PUB);
    zmq_bind(subscriber, "ipc://my.sock");

        //Initialising protomsg (not so important)

        //sending message

        len = message__get_packed_size(&protomsg);
    buf = malloc(len);
    message__pack(&protomsg, buf);

    zmq_msg_t output;
    zmq_msg_init_size(&output, len);
    zmq_msg_init_data(&output, buf, len, NULL, NULL);
    if(zmq_msg_send(&output, subscriber, 0) == -1)
        perror("Error sending message \n");
    else
        printf("Message successfully sent \n");
    zmq_msg_close(&output);
        free(buf);

        zmq_close (subscriber);
        zmq_ctx_destroy (context);
        return 0;
}

サブスクライバー:

    int main (void){
        Message *protomsg;
            void *context = zmq_ctx_new ();
            void *publisher = zmq_socket (context, ZMQ_SUB);
            zmq_connect(publisher, "ipc://my.sock");
            zmq_setsockopt(publisher, ZMQ_SUBSCRIBE, "", 0);

            // Read packed message from ZMQ.
        zmq_msg_t msg;
        zmq_msg_init(&msg);
        if(zmq_msg_recv(&msg, publisher, 0) == -1)
            perror("Error receiving message \n");
        else
            printf("Message received");
        memcpy((void *)protomsg, zmq_msg_data(&msg), zmq_msg_size(&msg));

            // Unpack the message using protobuf-c.
        protomsg = message__unpack(NULL, zmq_msg_size(&msg), (void *)&data);   
        if (protomsg == NULL)
        {
            fprintf(stderr, "error unpacking incoming message\n");
            exit(1);
        }

            printf("Address: %u, Type: %u, Information[0]: %u, Information[1]: %u \n", protomsg->address-48, protomsg->frametype, protomsg->information[0], protomsg->information[1]);
        zmq_msg_close (&msg);

        // Free the unpacked message
        message__free_unpacked(protomsg, NULL);

            //close context,socket..
}
4

1 に答える 1

0

誰かがまだこれを気にしているかどうかはわかりませんが、ここに行きます...これはタイミングの問題であるという@Steve-oに同意しますが、問題はパブリッシャーソケットを閉じるのが早すぎることだと思います.

パブリッシャー コードはメッセージをパブリッシュし、すぐにソケットを閉じてコンテキストを終了します。したがって、メッセージはパブリッシャーに数ミリ秒間存在し、その後永久に消えてしまいます。

パブリッシャーを最初に実行すると、それが実行され、終了し、メッセージが消えます。サブスクライバを起動すると、存在しない IPC ソケットに接続しようとします。ZeroMQ はこれを許可し、加入者は接続先の IPC ソケットが存在するまでブロックします。

私は ZeroMQ IPC ソース コードを確認していませんが、サブスクライバーが定期的にパブリッシャー ソケットに接続しようとしていると思われます。パブリッシャーを再度実行すると、動作する可能性がありますが、深刻な競合状態が発生しています。ZeroMQ ワーカーが再試行しようとしていたまさにその瞬間にパブリッシャーを開始すると、接続が発生し、パブリッシャーがすべてを破棄する前にメッセージを受け取ることさえあります。

問題が構造体とプロトブフとは何の関係もないと確信しています。ZeroMQ の観点からは、バイトを送信しているだけです。違いはありません。ZeroMQ 文字列のテスト ケースが ZeroMQ 構造体のテスト ケースと完全に同一である場合、おそらくコードの変更により数ナノ秒が追加または削除され、競合状態を間違った方法で破ることができました。

具体的な提案:

  • パブリッシャーのソケットの名前をサブスクライバーではなく「パブリッシャー」に変更します (コピー/貼り付けエラー)
  • zmq_close (発行者) の直前に 30 秒間のスリープを追加します。
  • うまくいけば、これでテストコードの問題が修正されます
  • これで問題が解決しない場合は、tcp トランスポートに切り替えることを検討し、wiresharkを使用して実際に何が起こっているのかを診断してください。
于 2012-10-08T01:10:53.277 に答える