zmqppを使用して PUB/SUB 接続を作成し、ヘッダーのみの C++11 バージョンのmsgpack-cを使用してパブリッシャーからサブスクライバーにデータを送信したいと考えています。
int64_tサイト運営者は 2 つの数字 --header_1とheader_2-- の後にstd::vector<T>-- --を送信する必要があります。dataここで、は組み合わせTによって決まります。(header_1, header_2)
msgpack と zmqpp を組み合わせる方法の例はそれほど多くないので、私が思いついたアイデアは、 を使用して 3 部構成のメッセージを送信することzmqpp::message::add/add_rawです。各パーツは、msgpack を使用してパック/アンパックされます。
パブリッシャーは、次のように 1 つのデータ パーツをパックします。
zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());
そして、レシーバーは次のように展開します。
zmqpp::message msg;
subscriberSock.receive(msg);
int64_t header_1;
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(0)),
msg.size(0));
unpackedData.get().convert(&header_1);
コードを実行すると、サブスクライバー側で次のエラーが発生します。
terminate called after throwing an instance of 'msgpack::v1::insufficient_bytes'
what(): insufficient bytes
Aborted
また、zmqpp はadd()3 回しか呼び出していないにもかかわらず、5 部構成のメッセージを生成したようです。
Q1: データを正しくパック/アンパックしていますか?
Q2: これは、 zmqpp を使用して msgpack バッファーを送信するための適切な方法ですか?
コードの重要な部分は次のとおりです。
出版社
zmqpp::socket publisherSock;
/* connection setup stuff ...*/
// forever send data to the subscribers
while(true)
{
zmqpp::message msg;
// meta info about the data
int64_t header_1 = 1234567;
int64_t header_2 = 89;
// sample data
std::vector<double> data;
data.push_back(1.2);
data.push_back(3.4);
data.push_back(5.6);
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());
cout << "header_1:" << header_1 << endl; // header_1:1234567
}
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_2);
msg.add(buffer.data(), buffer.size());
cout << "header_2:" << header_2 << endl; // header_2:89
}
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, data);
msg.add_raw(buffer.data(), buffer.size());
std::cout << "data: " << data << std::endl; // data:[1.2 3.4 5.6]
}
std::cout << msg.parts() << " parts" << std::endl; // prints "5 parts"... why ?
publisherSock.send(msg);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
購読者
zmqpp::socket subscriberSock;
/* connection setup stuff ...*/
zmqpp::message msg;
subscriberSock.receive(msg);
int64_t header_1;
int64_t header_2;
std::vector<double> data;
std::cout << msg.parts() << " parts" << std::endl; // prints "5 parts"
{
// header 1
{
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(0)),
msg.size(0));
unpackedData.get().convert(&header_1);
cout << "header_1:" << header_1 << endl;
}
// header 2
{
msgpack::unpacked unpackedData;
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(1)),
msg.size(1));
unpackedData.get().convert(&header_2);
cout << "header_2:" << header_2 << endl;
}
// data
{
msgpack::unpacked unpacked_data;
msgpack::unpack(unpacked_data,
static_cast<const char*>(msg.raw_data(2)),
msg.size(2));
unpacked_data.get().convert(&data);
std::cout << "data:" << data << std::endl;
}
}
編集:問題解決: @Jens が指摘したように、データをパッキング/送信する正しい方法は、zmqpp::message::add_raw()
zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add_raw(buffer.data(), buffer.size());