1

.NET 4.5 で NetMQ (Nuget 3.3.2.2) を使用しており、PUSH ソケットを使用する単一の高速ジェネレーター プロセスと、PULL ソケットを使用する単一の低速コンシューマー プロセスがあります。送信側 HWM に到達するのに十分な数のメッセージを送信すると、送信側プロセスはスレッドを無期限にブロックします。

問題を説明するいくつかの不自然な(ジェネレーター)コード:

        using (var ctx = NetMQContext.Create())
        using (var pushSocket = ctx.CreatePushSocket())
        {
            pushSocket.Connect("tcp://127.0.0.1:42404");

            var template = GenerateMessageBody(i);
            for (int i = 1; i <= 100000; i++)
            {
                pushSocket.SendMoreFrame("SampleMessage").SendFrame(Messages.SerializeToByteArray(template));

                if (i % 1000 == 0)
                    Console.WriteLine("Sent " + i + " messages");
            }
            Console.WriteLine("All finished");
            Console.ReadKey();
        }

私の構成では、これは通常、約 5000 または 6000 のメッセージを送信したことを報告し、その後単純にブロックします。送信 HWM セットを大きな値 (または 0) に設定すると、期待どおりにすべてのメッセージが送信されます。

ここでは、再試行する前に別のコマンドを受信するのを待っているようです: (SocketBase.TrySend)

        // Oops, we couldn't send the message. Wait for the next
        // command, process it and try to send the message again.
        // If timeout is reached in the meantime, return EAGAIN.
        while (true)
        {
            ProcessCommands(timeoutMillis, false);

私が 0MQ ガイドで読んだことから、完全な PUSH ソケットでのブロックは正しい動作です (そして私がやりたいことです) が、消費者がそのキューをクリアすると回復することが期待されます。

ある種の TrySend パターンを使用して自分でブロックを処理する以外に、PUSH ソケットにブロックされたメッセージを定期的に再送信させるために設定できるオプションやその他の機能はありますか?

4

0 に答える 0