2

目標: 1 回の反復ですべてのサブスクライバーに多くのメッセージを送信します。40k-100k のメッセージがあります。PUB/SUBソケットタイプを使い始めました。

問題: サブスクライバーで受信されたメッセージの数が、パブリッシャーで送信されたメッセージの数よりも少なくなっています。各メッセージの送信後に Thread.Sleep(1) を追加すると、すべてのメッセージが配信されますが、配信する必要があるメッセージの数が多いため、40 ~ 100 秒の遅延が発生します。これは受け入れられません。

以下のコードは、アルファ ビルドである NetMQ (3.0.0) にありますが、libzmq 3.2.4 (安定版) を使用して c で同じコードを実装したため、これは単なる例です。そして、シンプトムは同じです。

パブリッシャー/サーバー側:

using (var dbConn = new OracleConnection(ConfigurationManager.AppSettings["ConnString"]))
using (NetMQContext ctx = NetMQContext.Create())
{
  using (var publisher = ctx.CreatePublisherSocket())
  {
    publisher.Bind(ConfigurationManager.AppSettings["PubSocket"]);
    dbConn.Open();
    NetMQMessage m = new NetMQMessage();
    while (true)
    {
      var updateIds = new List<int>();
      var deletedIds = new List<int>();

      var changedRules = GetChangedItems(dbConn, ref updateIds);
      var deletedRules = GetDeletedItems(dbConn, ref deletedIds);

      foreach (var kvPair in changedRules)
      {
        var item= kvPair.Value;
        publisher.Send(ToCsvLine(item));
        //Thread.Sleep(1);
      }

      foreach (var kvPair in deletedRules)
      {
        var item = kvPair.Value;

        publisher.Send(ToCsvLine(item));
        //Thread.Sleep(1);
      }
      Thread.Sleep(1);
      publisher.Send("end");

      Console.WriteLine("Sent updated: {0}", updateIds.Count);
      Console.WriteLine("Sent deleted: {0}", deletedIds.Count);
      Thread.Sleep(6000);
    }
  }

サブスクライバー/クライアント側:

using (NetMQContext ctx = NetMQContext.Create())
{
    using (var consumer = ctx.CreateSubscriberSocket())
    {
      consumer.Connect("tcp://192.168.1.122:6005");
      consumer.Subscribe("");

      int count = 0;
      while (true)
      {
        try
        {
          count++;
          string msg = consumer.ReceiveString();
          if (msg == "end")
          {
            Console.WriteLine("Count: {0}", count);
            count = 0;
          }
        }
        catch (Exception ex)
        {
          Console.WriteLine(ex.Message);
          Console.ReadLine();
        }
      }
    }
}
4

1 に答える 1

5

ZMQ は、デフォルトでハイ ウォーター マークを 1000 に設定します。また、100%正確ではありません。1 回のバッチで少なくともその半分に達するか、それを超えることができると想定できますか? もしそうなら、それはおそらくあなたの問題です。HWM を 10,000 または 50,000 に高く設定し (または、テスト目的で、0 に設定してオフにします)、結果がどのように変化するかを確認します。

于 2014-06-02T17:55:22.230 に答える