-1

私はZeroMQでしばらく遊んでいて、思いついたいくつかの質問/問題があります. ZeroMQ への貢献者、またはライブラリを使用したことがある、または現在使用している誰かが参加してくれれば幸いです。

*1 つのルーター/フォワーダーと 2 つの異なるクライアント (c1、c2) があるとします。ルーティング デバイスを介して client1 から client2 にメッセージをプッシュしたいと考えています。ルーターは、クライアント (ここでは client1) からメッセージをプルし、サブスクライブしているクライアント (ここでは client2) にそれらをパブリッシュします。現在、このようなメッセージを適切なクライアントにルーティングする唯一の方法は pub/sub を使用することですが、a) メッセージ本文とともに routingTo タグを送信することで実行時にルーティングする方法を決定したい、b) プッシュを使用したい/pull を使用してクライアントに転送します。pub/sub ではなく、ハイ ウォーター マーク プロパティを設定するときにブロック機能を実装したいからです。pub/sub を使用する必要がないようにルーター側で何らかの変更を加えることはできますか? または、メッセージが転送されるはずのルーティング側でわかっていても、クライアントにルーティングする唯一の方法は pub/sub ですか? キューのサイズが必要のない hwm を超えると、pub/sub がメッセージをドロップすることを読みました。また、応答が必要ないため不要なオーバーヘッドが追加されるため、要求/応答パターンを実装したくありません。

*以下のコード (Push/Pull -> Pub/Sub) を実行し、すべてのメッセージを送信し、すべてのメッセージが受信されたという確認を受け取った後、メッセージをプッシュしたクライアントは依然として巨大なメモリ フットプリントを表示します。 Push ソケットのキュー。それはなぜですか、それを修正するにはどうすればよいですか?

これが私のコードです:

ルーター:

class Program
{
    static void Main(string[] args)
    {
        using (var context = new Context(1))
        {
            using (Socket socketIn = context.Socket(SocketType.PULL), socketOut = context.Socket(SocketType.XPUB))
            {
                socketIn.HWM = 10000;
                socketOut.Bind("tcp://*:5560"); //forwards on this port
                socketIn.Bind("tcp://*:5559"); //listens on this port

                Console.WriteLine("Router started and running...");

                while (true)
                {
                    //Receive Message
                    byte[] address = socketIn.Recv();
                    byte[] body = socketIn.Recv();

                    //Forward Message
                    socketOut.SendMore(address);
                    socketOut.Send(body);
                }
            }
        }
    }
}

クライアント1:

class Program
{
    static void Main(string[] args)
    {
        using (var context = new Context(1))
        {
            using (Socket socketIn = context.Socket(SocketType.SUB), socketOut= context.Socket(SocketType.PUSH))
            {
                byte[] iAM = Encoding.Unicode.GetBytes("Client1");
                byte[] youAre = Encoding.Unicode.GetBytes("Client2");
                byte[] msgBody = new byte[16];

                socketOut.HWM = 10000;
                socketOut.Connect("tcp://localhost:5559");
                socketIn.Connect("tcp://localhost:5560");
                socketIn.Subscribe(iAM);

                Console.WriteLine("Press key to kick off Test Client1 Sending Routine");
                Console.ReadLine();

                for (int counter = 1; counter <= 10000000; counter++)
                {
                    //Send Message
                    socketOut.SendMore(youAre);
                    socketOut.Send(msgBody);
                }

                Console.WriteLine("Client1: Finished Sending");
                Console.ReadLine();
            }
        }
    }
}

クライアント 2:

class Program
{
    public static int msgCounter;

    static void Main(string[] args)
    {
        msgCounter = 0;

        using (var context = new Context(1))
        {
            using (Socket socketIn = context.Socket(SocketType.SUB), socketOut = context.Socket(SocketType.PUSH))
            {
                byte[] iAM = Encoding.Unicode.GetBytes("Client2");

                socketOut.Connect("tcp://localhost:5559");
                socketIn.Connect("tcp://localhost:5560");
                socketIn.Subscribe(iAM);

                Console.WriteLine("Client2: Started Listening");

                //Receive First Message
                byte[] address = socketIn.Recv();
                byte[] body = socketIn.Recv();
                msgCounter += 1;

                Console.WriteLine("Received first message");

                Stopwatch watch = new Stopwatch();
                watch.Start();

                while (msgCounter < 10000000)
                {
                    //Receive Message
                    address = socketIn.Recv();
                    body = socketIn.Recv();
                    msgCounter += 1;
                }

                watch.Stop();
                Console.WriteLine("Elapsed Time: " + watch.ElapsedMilliseconds + "ms");
                Console.ReadLine();
            }
        }
    }
}
4

1 に答える 1

2

ここでは、アーキテクチャが少しずれている可能性があることをお勧めします。

1) PUSH が 1 回、PULL が 1 回だけ必要な場合は、デバイスを中央から取り外します。ノードを追加するたびにプロデューサーを更新する必要がないように、複数のコンシューマーを管理するためにデバイスがアーキテクチャに明示的に追加されます。複数のコンシューマーやプロデューサーが必要な場合は、デバイスの各ノードへの接続が必要になります。それがまさにそのように機能します。この場合、デバイスがソリューションを過度に複雑にしているように聞こえます。

2) "route to" タグを付けるというアイデアは、本当に頭がおかしくなりました。おそらく、他の統合オプションよりもメッセージングを選択する最大の理由は、プロデューサーとコンシューマーを分離して、どちらの側も相手について何も知る必要がないようにすることです (ブローカーレス設計の場合にメッセージを送信する場所以外)。ルーティング情報をロジックに直接追加すると、これが壊れます。

オーバーヘッドに関しては、私はこれを経験したことがありません。しかし、私は以前に ZeroMQ 用の .Net ドライバーを使用したことがないため、.Net ドライバー自体を調べることは無知な推測になります。

于 2012-04-22T18:23:22.530 に答える