1

これはフォーラムへの最初の投稿であり、助けが必要です。

ここhttps://github.com/egalli64/thisthread/blob/master/zmq/d2r.cppに記載されているものとほぼ同じ ZeroMQ サーバー部分を実装しました 。

以下は私のコードです

#include <string>
#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/random.hpp>
#include "zmq2.h"
#include <pthread.h>
#include <unistd.h>
#include <cassert>
#include <string>
#include <iostream>
#include <zmq.hpp>

boost::mutex mio;

void *worker(void *arg)
{
    std::cout << "in worker" << std::endl;
    zmq::context_t *context = (zmq::context_t *) arg;
    zmq::Socket skWorker(*context, ZMQ_DEALER);
    std::cout << "in worker2" << std::endl;
    skWorker.connect("inproc://backend");

    std::cout<< "Worker started" << std::endl;

    while(true)
    {
    std::cout << "in while" << std::endl;
    zmq::Frames frames = skWorker.blockingRecv(2, false);
    if(frames.size() == 1)
    {
        std::cout << "in if" << std::endl;
        break;
    }
// send a few replies

    for(int i =0; i < 10; ++i)
    {
        boost::this_thread::sleep(boost::posix_time::millisec(100 * 10));
        std::cout << "worker reply" << std::endl;
        skWorker.send(frames);
    }
    }

    std::cout << "terminating worker" << std::endl;
    return (NULL);
}


 void dumpId(const char* tag, const std::string& msg)
{
    boost::lock_guard<boost::mutex> lock(mio);
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id());

    std::cout << id << ' ' << tag << ' ' << msg << std::endl;
}

int main()
{
    zmq::context_t context(1);
    zmq::Socket client(context, ZMQ_ROUTER);
    client.bind("tcp://*:5555");

    zmq::Socket workers(context, ZMQ_DEALER);
    workers.bind("inproc://workers");

    for (int thread_nbr = 0; thread_nbr != 2; thread_nbr++)
    {
    pthread_t threads;
    pthread_create(&threads, NULL, worker, (void *) &context);
    }
    std::cout << " server started" << std::endl;

    zmq_pollitem_t items [] =
    {
    { client, 0, ZMQ_POLLIN, 0 },
    { workers, 0, ZMQ_POLLIN, 0 }
    };

    while(true)
    {
    if(zmq_poll(items, 2, 3000 * 1000) < 1) // 3 secs
        break;

    if(items[0].revents & ZMQ_POLLIN)
    {
        zmq::Frames frames = client.blockingRecv(2);
        dumpId("Received on frontend", frames[0]);
        workers.send(frames);
        items[0].revents = 0; // cleanup
    }
    if(items[1].revents & ZMQ_POLLIN)
    {
        zmq::Frames frames = workers.blockingRecv(2);
        dumpId("Received on backend", frames[0]);
        client.send(frames);
        items[1].revents = 0; // cleanup
    }
    }

    std::cout <<"Nothing happened for a while" << std::endl;

// send a terminator to each worker
    for(int i = 0; i < 2; ++i)
   workers.send("");


    std::cout << "Server shutdown" << std::endl;
}

コアダンプを実行した後。

以下は出力です

in workerin worker サーバーが起動しました

'zmq::error_t' のインスタンスをスローした後に呼び出されます。 再帰的に呼び出されます。 中止 (コア ダンプ)

誰かが私がどこで間違っているのか教えてもらえますか....... よろしくお願いします....

4

0 に答える 0