私はZeroMQでしばらく遊んでいて、思いついたいくつかの質問/問題があります. ZeroMQ への貢献者、またはライブラリを使用したことがある、または現在使用している誰かが参加してくれれば幸いです。
*1 つのルーター/フォワーダーと 2 つの異なるクライアント (c1、c2) があるとします。ルーティング デバイスを介して client1 から client2 にメッセージをプッシュしたいと考えています。ルーターは、クライアント (ここでは client1) からメッセージをプルし、サブスクライブしているクライアント (ここでは client2) にそれらをパブリッシュします。現在、このようなメッセージを適切なクライアントにルーティングする唯一の方法は pub/sub を使用することですが、a) メッセージ本文とともに routingTo タグを送信することで実行時にルーティングする方法を決定したい、b) プッシュを使用したい/pull を使用してクライアントに転送します。pub/sub ではなく、ハイ ウォーター マーク プロパティを設定するときにブロック機能を実装したいからです。pub/sub を使用する必要がないようにルーター側で何らかの変更を加えることはできますか? または、メッセージが転送されるはずのルーティング側でわかっていても、クライアントにルーティングする唯一の方法は pub/sub ですか? キューのサイズが必要のない hwm を超えると、pub/sub がメッセージをドロップすることを読みました。また、応答が必要ないため不要なオーバーヘッドが追加されるため、要求/応答パターンを実装したくありません。
*以下のコード (Push/Pull -> Pub/Sub) を実行し、すべてのメッセージを送信し、すべてのメッセージが受信されたという確認を受け取った後、メッセージをプッシュしたクライアントは依然として巨大なメモリ フットプリントを表示します。 Push ソケットのキュー。それはなぜですか、それを修正するにはどうすればよいですか?
これが私のコードです:
ルーター:
class Program
{
static void Main(string[] args)
{
using (var context = new Context(1))
{
using (Socket socketIn = context.Socket(SocketType.PULL), socketOut = context.Socket(SocketType.XPUB))
{
socketIn.HWM = 10000;
socketOut.Bind("tcp://*:5560"); //forwards on this port
socketIn.Bind("tcp://*:5559"); //listens on this port
Console.WriteLine("Router started and running...");
while (true)
{
//Receive Message
byte[] address = socketIn.Recv();
byte[] body = socketIn.Recv();
//Forward Message
socketOut.SendMore(address);
socketOut.Send(body);
}
}
}
}
}
クライアント1:
class Program
{
static void Main(string[] args)
{
using (var context = new Context(1))
{
using (Socket socketIn = context.Socket(SocketType.SUB), socketOut= context.Socket(SocketType.PUSH))
{
byte[] iAM = Encoding.Unicode.GetBytes("Client1");
byte[] youAre = Encoding.Unicode.GetBytes("Client2");
byte[] msgBody = new byte[16];
socketOut.HWM = 10000;
socketOut.Connect("tcp://localhost:5559");
socketIn.Connect("tcp://localhost:5560");
socketIn.Subscribe(iAM);
Console.WriteLine("Press key to kick off Test Client1 Sending Routine");
Console.ReadLine();
for (int counter = 1; counter <= 10000000; counter++)
{
//Send Message
socketOut.SendMore(youAre);
socketOut.Send(msgBody);
}
Console.WriteLine("Client1: Finished Sending");
Console.ReadLine();
}
}
}
}
クライアント 2:
class Program
{
public static int msgCounter;
static void Main(string[] args)
{
msgCounter = 0;
using (var context = new Context(1))
{
using (Socket socketIn = context.Socket(SocketType.SUB), socketOut = context.Socket(SocketType.PUSH))
{
byte[] iAM = Encoding.Unicode.GetBytes("Client2");
socketOut.Connect("tcp://localhost:5559");
socketIn.Connect("tcp://localhost:5560");
socketIn.Subscribe(iAM);
Console.WriteLine("Client2: Started Listening");
//Receive First Message
byte[] address = socketIn.Recv();
byte[] body = socketIn.Recv();
msgCounter += 1;
Console.WriteLine("Received first message");
Stopwatch watch = new Stopwatch();
watch.Start();
while (msgCounter < 10000000)
{
//Receive Message
address = socketIn.Recv();
body = socketIn.Recv();
msgCounter += 1;
}
watch.Stop();
Console.WriteLine("Elapsed Time: " + watch.ElapsedMilliseconds + "ms");
Console.ReadLine();
}
}
}
}