0

C を使用して、ZeroMQ ソケットを介して protobuf でシリアル化されたメッセージを受信できません。クライアントによって入力されたメッセージをシリアル化し、zhelpers.h で定義された s_send() 関数を使用してこのバッファをサーバーに送信しました。サーバーコードは、サンプルとして zeromq パッケージにバンドルされている同じテスト コードです。

これが私のクライアント側です:

#include "amessage.pb-c.h"
#include "zhelpers.h"

int main (void)
{

    AMessage msg = AMESSAGE__INIT; // AMessage
    void *buf;                     // Buffer to store serialized data
    unsigned len;

    printf ("Connecting to server...\n");
    void *context = zmq_ctx_new ();
    void *requester = zmq_socket (context, ZMQ_REQ);

    char buffer[256] = "";

    printf("[client] :");
    scanf("%s", buffer );

    msg.csmsg = buffer;
    len = amessage__get_packed_size(&msg);        
    buf = malloc(len);

    printf("[client]: pack msg len : %d\n ", len);
    printf("Sent msg : %d\n", buf);

    amessage__pack(&msg,buf);

    s_send(requester, buf);

    zmq_close (requester);
    zmq_ctx_destroy (context);
    return 0;
}

そしてサーバー側:

#include "zhelpers.h"
#include <pthread.h>
#include <stdlib.h>
#include "amessage.pb-c.h"

#define MAX_MSG_SIZE 256

static size_t read_buffer (unsigned max_length, unsigned char *out)
{
   size_t cur_len = 0, nread;
   uint8_t c;
   while ((nread=fread(out + cur_len, 1, max_length - cur_len, stdin)) != 0)
   {
       cur_len += nread;
       if (cur_len == max_length)
       {
           fprintf(stderr, "[server]: max message length exceeded\n");
           exit(1);
       }
   }
   return cur_len;
 }


 static void * worker_routine (void *context) 
 {

    AMessage *msg;

    uint8_t buf[MAX_MSG_SIZE];
   char buffer[256];

   //  Socket to talk to dispatcher
   void *receiver = zmq_socket (context, ZMQ_REP);
   zmq_connect (receiver, "inproc://workers");

   while (1) {

          uint8_t *string = s_recv (receiver);

       if(string == 0)
            printf("[server]: Error: In receiving msg.\n");
       else
       {

        size_t msg_len = read_buffer (MAX_MSG_SIZE, string);
        printf("[server]: client msg len is: %d.\n", msg_len);
        msg = amessage__unpack(NULL, msg_len, string);   
        if (msg == NULL)
        {
            fprintf(stderr, "[server]: error unpacking incoming message\n");
            exit(1);
        }
        printf ("[client]: %s \n", msg->csmsg);

    }
    amessage__free_unpacked(msg, NULL);
    free (string);
    //  Do some 'work'
    sleep (1);

   }
   zmq_close (receiver);
   return NULL;
}

  int main (void)
  {
   void *context = zmq_ctx_new ();
   void *clients = zmq_socket (context, ZMQ_ROUTER);
   zmq_bind (clients, "tcp://*:5555");

   void *workers = zmq_socket (context, ZMQ_DEALER);
   zmq_bind (workers, "inproc://workers");

   //  Launch pool of worker threads
   int thread_nbr;
   for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, context);
   }
    //  Connect work threads to client threads via a queue proxy
    zmq_proxy (clients, workers, NULL);

    zmq_close (clients);
    zmq_close (workers);

    zmq_ctx_destroy (context);
    return 0;
  }

私が間違っていることは何ですか?

4

1 に答える 1

2

s_send()引数として C 文字列を期待する whichを使用しており、strlen()そのサイズを決定するために呼び出します。ただし、プロトコル バッファ データはバイナリ データであり、メッセージ内の任意の場所にヌル バイトが含まれる場合があります。

代わりzmq_send()に、メッセージの長さを使用してzmq_msg_init_size()関数に渡します。

于 2013-03-12T06:38:02.723 に答える