SO に関する以前の質問に基づいて、(REQ/REP モデルではなく) DEALER/ROUTER モデルを使用してパフォーマンスを最大化することを提案し、次のクライアントとサーバー コードをセットアップしました。
クライアントの asynccli.c ソースは 8 つのスレッドを起動し、それぞれが zmq TCP ソケットで送受信します。サーバー asyncsrv.c は 4 つのワーカー スレッドを起動し、プロキシを使用して着信要求をワーカー スレッドに分散します。
10 秒間のテストでは、40,000 メッセージから 120,000 メッセージまでの範囲のパフォーマンスを経験しました。8GBのメモリを搭載したi7(8HTコア)ラップトップでUbuntuを実行しています。czmq ライブラリを使用します。
ZeroMQ で 200,000 メッセージ/秒以上を達成できると思いました。「非同期」のことを正しくキャッチできなかったと思います。周りの C サンプル コードはありますか? 基本的に、私は現在 zmq_poll() をここで実行しているため、非同期のものを取得する方法がわかりません。
asynccli.c:
// results : 4000/s
#include "czmq.h"
int id = 0;
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_DEALER);
char identity [10];
sprintf (identity, "%d", id);
zsockopt_set_identity (client, identity);
zsocket_connect (client, "tcp://localhost:5570");
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
int request_nbr = 0;
while (true) {
// Tick once per second, pulling in arriving messages
int centitick;
for (centitick = 0; centitick < 100; centitick++) {
zmq_poll (items, 1, 1);
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (client);
//zframe_print (zmsg_last (msg), identity);
zmsg_destroy (&msg);
break;
}
}
id+=1;
zstr_send (client, "request #%d", ++request_nbr);
}
zctx_destroy (&ctx);
return NULL;
}
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
int main (void)
{
zthread_new (client_task, NULL);
zthread_new (client_task, NULL);
zthread_new (client_task, NULL);
zthread_new (client_task, NULL);
zthread_new (client_task, NULL);
zthread_new (client_task, NULL);
zthread_new (client_task, NULL);
zthread_new (client_task, NULL);
zclock_sleep (10 * 1000); // Run for 10 seconds then quit
printf ("\\ntotal iterations = %d\n" , id );
return 0;
}
asyncsrv.c:
#include "czmq.h"
static void server_worker (void *args, zctx_t *ctx, void *pipe);
void *server_task (void *args)
{
// Frontend socket talks to clients over TCP
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5570");
// Backend socket talks to workers over inproc
void *backend = zsocket_new (ctx, ZMQ_DEALER);
zsocket_bind (backend, "inproc://backend");
// Launch pool of worker threads, precise number is not critical
int thread_nbr;
for (thread_nbr = 0; thread_nbr < 3; thread_nbr++)
zthread_fork (ctx, server_worker, NULL);
// Connect backend to frontend via a proxy
zmq_proxy (frontend, backend, NULL);
zctx_destroy (&ctx);
return NULL;
}
static void
server_worker (void *args, zctx_t *ctx, void *pipe)
{
void *worker = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (worker, "inproc://backend");
while (true) {
// The DEALER socket gives us the reply envelope and message
zmsg_t *msg = zmsg_recv (worker);
zframe_t *identity = zmsg_pop (msg);
zframe_t *content = zmsg_pop (msg);
assert (content);
zmsg_destroy (&msg);
// Sleep for some fraction of a second
zframe_send (&identity, worker, ZFRAME_REUSE + ZFRAME_MORE);
zframe_send (&content, worker, ZFRAME_REUSE);
zframe_destroy (&identity);
zframe_destroy (&content);
}
}
int main (void)
{
zthread_new (server_task, NULL);
zclock_sleep (15 * 1000); // Run for 15 seconds then quit
return 0;
}