3

zmqppを使用して PUB/SUB 接続を作成し、ヘッダーのみの C++11 バージョンのmsgpack-cを使用してパブリッシャーからサブスクライバーにデータを送信したいと考えています。

int64_tサイト運営者は 2 つの数字 --header_1header_2-- の後にstd::vector<T>-- --を送信する必要があります。dataここで、は組み合わせTによって決まります。(header_1, header_2)

msgpack と zmqpp を組み合わせる方法の例はそれほど多くないので、私が思いついたアイデアは、 を使用して 3 部構成のメッセージを送信することzmqpp::message::add/add_rawです。各パーツは、msgpack を使用してパック/アンパックされます。

パブリッシャーは、次のように 1 つのデータ パーツをパックします。

zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());

そして、レシーバーは次のように展開します。

zmqpp::message msg;
subscriberSock.receive(msg);

int64_t header_1;
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
                static_cast<const char*>(msg.raw_data(0)),
                msg.size(0));
unpackedData.get().convert(&header_1);

コードを実行すると、サブスクライバー側で次のエラーが発生します。

terminate called after throwing an instance of 'msgpack::v1::insufficient_bytes'
  what():  insufficient bytes
Aborted

また、zmqpp はadd()3 回しか呼び出していないにもかかわらず、5 部構成のメッセージを生成したようです。

Q1: データを正しくパック/アンパックしていますか?

Q2: これは、 zmqpp を使用して msgpack バッファーを送信するための適切な方法ですか?

コードの重要な部分は次のとおりです。

出版社

zmqpp::socket publisherSock;
/* connection setup stuff ...*/

// forever send data to the subscribers
while(true)
{
    zmqpp::message msg;

    // meta info about the data
    int64_t header_1 = 1234567;
    int64_t header_2 = 89;
    // sample data
    std::vector<double> data;
    data.push_back(1.2);
    data.push_back(3.4);
    data.push_back(5.6);


    {
        msgpack::sbuffer buffer;
        msgpack::pack(buffer, header_1);
        msg.add(buffer.data(), buffer.size());
        cout << "header_1:" << header_1 << endl;  // header_1:1234567
    }

    {
        msgpack::sbuffer buffer;
        msgpack::pack(buffer, header_2);
        msg.add(buffer.data(), buffer.size());
        cout << "header_2:" << header_2 << endl;  // header_2:89
    }

    {
        msgpack::sbuffer buffer;
        msgpack::pack(buffer, data);
        msg.add_raw(buffer.data(), buffer.size());
        std::cout << "data: " << data << std::endl;  // data:[1.2 3.4 5.6]
    }

    std::cout << msg.parts() << " parts" << std::endl;  // prints "5 parts"... why ?
    publisherSock.send(msg);

    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

購読者

zmqpp::socket subscriberSock;
/* connection setup stuff ...*/

zmqpp::message msg;
subscriberSock.receive(msg);

int64_t header_1;
int64_t header_2;
std::vector<double> data;

std::cout << msg.parts() << " parts" << std::endl;  // prints "5 parts"
{
    // header 1
    {
        msgpack::unpacked unpackedData;
        // crash !
        msgpack::unpack(unpackedData,
                        static_cast<const char*>(msg.raw_data(0)),
                        msg.size(0));
        unpackedData.get().convert(&header_1);
        cout << "header_1:" << header_1 << endl;
    }
    // header 2
    {
        msgpack::unpacked unpackedData;
        msgpack::unpack(unpackedData,
                        static_cast<const char*>(msg.raw_data(1)),
                        msg.size(1));
        unpackedData.get().convert(&header_2);
        cout << "header_2:" << header_2 << endl;
    }
    // data
    {
        msgpack::unpacked unpacked_data;
        msgpack::unpack(unpacked_data,
                        static_cast<const char*>(msg.raw_data(2)),
                        msg.size(2));
        unpacked_data.get().convert(&data);
        std::cout << "data:" << data << std::endl;
    }

}

編集:問題解決: @Jens が指摘したように、データをパッキング/送信する正しい方法は、zmqpp::message::add_raw()

zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add_raw(buffer.data(), buffer.size());
4

1 に答える 1