目標: 1 回の反復ですべてのサブスクライバーに多くのメッセージを送信します。40k-100k のメッセージがあります。PUB/SUBソケットタイプを使い始めました。
問題: サブスクライバーで受信されたメッセージの数が、パブリッシャーで送信されたメッセージの数よりも少なくなっています。各メッセージの送信後に Thread.Sleep(1) を追加すると、すべてのメッセージが配信されますが、配信する必要があるメッセージの数が多いため、40 ~ 100 秒の遅延が発生します。これは受け入れられません。
以下のコードは、アルファ ビルドである NetMQ (3.0.0) にありますが、libzmq 3.2.4 (安定版) を使用して c で同じコードを実装したため、これは単なる例です。そして、シンプトムは同じです。
パブリッシャー/サーバー側:
using (var dbConn = new OracleConnection(ConfigurationManager.AppSettings["ConnString"]))
using (NetMQContext ctx = NetMQContext.Create())
{
using (var publisher = ctx.CreatePublisherSocket())
{
publisher.Bind(ConfigurationManager.AppSettings["PubSocket"]);
dbConn.Open();
NetMQMessage m = new NetMQMessage();
while (true)
{
var updateIds = new List<int>();
var deletedIds = new List<int>();
var changedRules = GetChangedItems(dbConn, ref updateIds);
var deletedRules = GetDeletedItems(dbConn, ref deletedIds);
foreach (var kvPair in changedRules)
{
var item= kvPair.Value;
publisher.Send(ToCsvLine(item));
//Thread.Sleep(1);
}
foreach (var kvPair in deletedRules)
{
var item = kvPair.Value;
publisher.Send(ToCsvLine(item));
//Thread.Sleep(1);
}
Thread.Sleep(1);
publisher.Send("end");
Console.WriteLine("Sent updated: {0}", updateIds.Count);
Console.WriteLine("Sent deleted: {0}", deletedIds.Count);
Thread.Sleep(6000);
}
}
サブスクライバー/クライアント側:
using (NetMQContext ctx = NetMQContext.Create())
{
using (var consumer = ctx.CreateSubscriberSocket())
{
consumer.Connect("tcp://192.168.1.122:6005");
consumer.Subscribe("");
int count = 0;
while (true)
{
try
{
count++;
string msg = consumer.ReceiveString();
if (msg == "end")
{
Console.WriteLine("Count: {0}", count);
count = 0;
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.ReadLine();
}
}
}
}