1

SO に関する以前の質問に基づいて、(REQ/REP モデルではなく) DEALER/ROUTER モデルを使用してパフォーマンスを最大化することを提案し、次のクライアントとサーバー コードをセットアップしました。

クライアントの asynccli.c ソースは 8 つのスレッドを起動し、それぞれが zmq TCP ソケットで送受信します。サーバー asyncsrv.c は 4 つのワーカー スレッドを起動し、プロキシを使用して着信要求をワーカー スレッドに分散します。

10 秒間のテストでは、40,000 メッセージから 120,000 メッセージまでの範囲のパフォーマンスを経験しました。8GBのメモリを搭載したi7(8HTコア)ラップトップでUbuntuを実行しています。czmq ライブラリを使用します。

ZeroMQ で 200,000 メッセージ/秒以上を達成できると思いました。「非同期」のことを正しくキャッチできなかったと思います。周りの C サンプル コードはありますか? 基本的に、私は現在 zmq_poll() をここで実行しているため、非同期のものを取得する方法がわかりません。

asynccli.c:

// results : 4000/s
#include "czmq.h"
int id = 0;

static void *
client_task (void *args)
{
    zctx_t *ctx = zctx_new ();
    void *client = zsocket_new (ctx, ZMQ_DEALER);

    char identity [10];

    sprintf (identity, "%d", id);
    zsockopt_set_identity (client, identity);
    zsocket_connect (client, "tcp://localhost:5570");

    zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
    int request_nbr = 0;
    while (true) {
        //  Tick once per second, pulling in arriving messages
        int centitick;
        for (centitick = 0; centitick < 100; centitick++) {
            zmq_poll (items, 1, 1);
            if (items [0].revents & ZMQ_POLLIN) {
                zmsg_t *msg = zmsg_recv (client);
                //zframe_print (zmsg_last (msg), identity);
                zmsg_destroy (&msg);
                break;
            }
        }

        id+=1;
        zstr_send (client, "request #%d", ++request_nbr);
    }
    zctx_destroy (&ctx);
    return NULL;
}

//  The main thread simply starts several clients and a server, and then
//  waits for the server to finish.

int main (void)
{

    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);

    zclock_sleep (10 * 1000);    //  Run for 10 seconds then quit
    printf ("\\ntotal iterations = %d\n" , id );
    return 0;
}

asyncsrv.c:

#include "czmq.h"

static void server_worker (void *args, zctx_t *ctx, void *pipe);

void *server_task (void *args)
{
    //  Frontend socket talks to clients over TCP
    zctx_t *ctx = zctx_new ();
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (frontend, "tcp://*:5570");

    //  Backend socket talks to workers over inproc
    void *backend = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_bind (backend, "inproc://backend");

    //  Launch pool of worker threads, precise number is not critical
    int thread_nbr;
    for (thread_nbr = 0; thread_nbr < 3; thread_nbr++)
        zthread_fork (ctx, server_worker, NULL);

    //  Connect backend to frontend via a proxy
    zmq_proxy (frontend, backend, NULL);

    zctx_destroy (&ctx);
    return NULL;
}

static void
server_worker (void *args, zctx_t *ctx, void *pipe)
{
    void *worker = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (worker, "inproc://backend");

    while (true) {
        //  The DEALER socket gives us the reply envelope and message
        zmsg_t *msg = zmsg_recv (worker);
        zframe_t *identity = zmsg_pop (msg);
        zframe_t *content = zmsg_pop (msg);
        assert (content);
        zmsg_destroy (&msg);

        //  Sleep for some fraction of a second
        zframe_send (&identity, worker, ZFRAME_REUSE + ZFRAME_MORE);
        zframe_send (&content, worker, ZFRAME_REUSE);

        zframe_destroy (&identity);
        zframe_destroy (&content);
    }
}

int main (void)
{
    zthread_new (server_task, NULL);
    zclock_sleep (15 * 1000);    //  Run for 15 seconds then quit
    return 0;
}
4

2 に答える 2

3

問題は、コード内で「送信」の前に「読み取り」を行うことにより、クライアントの送信機能が「制限」されていることです。

現在、クライアントのコードは次のとおりです。

while(true)
{
  pull_any_income_messages()
  send()
}

これにより、保留中の着信メッセージがある場合、クライアントは何も送信できなくなります。したがって、これは本質的に要求と応答のパターンになります。

これをスケーリングするには、「受信メッセージのプル」部分と「送信」部分を分離する必要があります。1 つの方法として、送信と受信の両方を処理するユニバーサル クライアント スレッドを用意する代わりに、クライアント用に 1 つは送信専用、もう 1 つは読み取り専用の 2 つの個別のスレッド タイプを作成することが考えられます。

別のアプローチは、「クレジットベースのフロー制御」を行うことです。ZMQ ガイドの第 7 章に情報があります ( http://zguide.zeromq.org/page:all#Chapter-Advanced-Architecture-using-MQ )。

-GK

http://gk.palem.in/

于 2014-04-23T09:43:00.413 に答える