25

一部のメッセージングにZeroMQを使用しているC++アプリケーションがあります。ただし、AJAX/CometベースのWebサービスにSGCI接続を提供する必要もあります。

このためには、通常のTCPソケットが必要です。私は通常のPosixソケットでそれを行うことができましたが、クロスプラットフォームのポータブルを維持し、私の生活を楽にするために(私は願っています...)私はBoost::ASIOを使用することを考えていました。

zmq_poll()しかし今、私はそれ自身とASIOを使いたいと思っているZMQの衝突がありio_service.run()ます...

ASIOを0MQと連携させる方法はありzmq_poll()ますか?

または、そのような設定を実現するための他の推奨される方法はありますか?

注:複数のスレッドを使用することでそれを解決できますが、SCGIトラフィックの量が非常に少ないプログラムを実行するのは小さなシングルコア/ CPUボックスであるため、マルチスレッドはリソースの浪費になります...

4

5 に答える 5

16

ここここ、特にこの段落のドキュメントを読んだ後

ZMQ_FD:ソケットに関連付けられたファイル記述子を取得するZMQ_FDオプションは、指定されたソケットに関連付けられたファイル記述子を取得する必要があります。返されたファイル記述子を使用して、ソケットを既存のイベントループに統合できます。ØMQライブラリは、ファイル記述子を読み取り可能にすることにより、エッジトリガー方式でソケット上の保留中のイベントを通知します。

null_buffersすべてzmq_pollitem_tに使用して、イベントループをio_service完全にバイパスするまで延期できると思いますzmq_poll()。前述のドキュメントにはいくつかの注意点があるようですが、特に

返されたファイル記述子から読み取る機能は、メッセージが基になるソケットから読み取れる、または書き込むことができることを必ずしも示しているわけではありません。アプリケーションは、実際のイベント状態を取得してから、ZMQ_EVENTSオプションを取得する必要があります。

したがって、zmqソケットの1つのハンドラーが起動された場合、イベントを処理する前にもう少し作業を行う必要があると思います。コンパイルされていない擬似コードは以下のとおりです

const int fd = getZmqDescriptorSomehow();
boost::asio::posix::stream_descriptor socket( _io_service, fd );
socket->async_read_some(
    boost::asio::null_buffers(),
    [=](const boost::system::error_code& error)
    {
       if (!error) {
           // handle data ready to be read
       }
     }
);

ここではラムダを使用する必要はないことに注意してくださいboost::bind。メンバー関数に十分です。

于 2012-10-11T22:13:27.300 に答える
2

結局、私は2つの可能な解決策があることを理解しました:

  • ASIOのイベントループを使用するSamMiller
  • およびの.native()メソッドを介してASIOファイル記述子を取得し、それらをの配列に挿入することによるZeroMQのイベントループacceptorsocketzmq_pollitem_t

常に新しい接続が作成されて終了するSCGIの場合の最良の解決策であるため、SamMillerの回答を受け入れました。このように変化するすべてのzmq_pollitem_t配列を処理することは、ASIOイベントループを使用することで回避できる大きな手間です。

于 2012-10-14T16:09:49.067 に答える
2

ZeroMQへのソケットを取得することは、戦いの最小の部分です。ZeroMQは、TCP上に階層化されたプロトコルに基づいているため、このルートを使用する場合は、カスタムBoost.Asioio_service内でZeroMQを再実装する必要があります。Boost.Asioを使用して非同期ENetサービスを作成するときに、最初にBoost.Asio UDPサービスを使用してENetクライアントからのトラフィックをキャッチしようとしたときに、同じ問題が発生しました。ENetはUDP上に階層化されたTCPのようなプロトコルであるため、その時点で達成したのは、事実上役に立たない状態のパケットをキャッチすることだけでした。

Boost.Asioはテンプレートベースであり、組み込みのio_serviceはテンプレートを使用して、基本的にシステムソケットライブラリをラップし、TCPおよびUDPサービスを作成します。私の最終的な解決策は、システムソケットライブラリではなくENetライブラリをラップするカスタムio_serviceを作成し、組み込みのUDPトランスポートを使用してENetのトランスポート関数を再実装するのではなく、使用できるようにすることでした。

ZeroMQについても同じことができますが、ZeroMQは、それ自体がすでに非常に高性能なネットワークライブラリであり、すでに非同期I/Oを提供しています。ZeroMQの既存のAPIを使用してメッセージを受信し、そのメッセージをio_serviceスレッドプールに渡すことで、実行可能なソリューションを作成できると思います。そうすれば、メッセージ/タスクは、何も書き直さなくても、Boost.Asioのreactorパターンを使用して非同期で処理されます。ZeroMQは非同期I/Oを提供し、Boost.Asioは非同期タスクハンドラー/ワーカーを提供します。

既存のio_serviceは、既存のTCPソケットにも結合できるため、スレッドプールはTCP(この場合はHTTP)とZeroMQの両方を処理できます。このような設定では、ZeroMQタスクハンドラーがTCPサービスセッションオブジェクトにアクセスすることが完全に可能であり、ZeroMQメッセージ/タスクの結果をTCPクライアントに送り返すことができます。

以下は、概念を説明するためのものです。

// Create a pool of threads to run all of the io_services.
std::vector<boost::shared_ptr<boost::thread> > threads;
for(std::size_t i = 0; i < thread_pool_size_; ++i) {
    boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_)));
    threads.push_back(thread);
}

while (1) {
    char buffer [10];
    zmq_recv (responder_, buffer, 10, 0);
    io_service_.post(boost::bind(&server::handle_zeromq_message, buffer, this));
}
于 2014-02-10T05:44:01.610 に答える
2

この質問の2年後、誰かがまさにこれを行うプロジェクトを投稿しました。プロジェクトはここにあります:https ://github.com/zeromq/azmq 。デザインについて説明しているブログ投稿はこちらです:https ://rodgert.github.io/2014/12/24/boost-asio-and-zeromq-pt1/ 。

readmeからコピーしたサンプルコードは次のとおりです。

#include <azmq/socket.hpp>
#include <boost/asio.hpp>
#include <array>

namespace asio = boost::asio;

int main(int argc, char** argv) {
    asio::io_service ios;
    azmq::sub_socket subscriber(ios);
    subscriber.connect("tcp://192.168.55.112:5556");
    subscriber.connect("tcp://192.168.55.201:7721");
    subscriber.set_option(azmq::socket::subscribe("NASDAQ"));

    azmq::pub_socket publisher(ios);
    publisher.bind("ipc://nasdaq-feed");

    std::array<char, 256> buf;
    for (;;) {
        auto size = subscriber.receive(asio::buffer(buf));
        publisher.send(asio::buffer(buf));
    }
    return 0;
}

いい感じ。試してみたら、2019年でも機能するかどうかコメントで知らせてください[おそらく数か月以内に試してから、この回答を更新します](リポジトリは古く、最後のコミットは1年前でした)

于 2019-04-04T14:46:33.353 に答える
0

解決策は、run()の代わりにio_serviceもポーリングすることです。

いくつかのpoll()情報については、このソリューションを確認してください。

runの代わりにpollを使用すると、ブロッキングの問題なしにzmqの接続をポーリングできます。

于 2012-10-11T12:58:35.330 に答える