1

私はNetMQ v4でディーラー <--> ルーターをセットアップしており、どの方向でも問題なくメッセージを非同期に送受信できます。

ここで、サーバー ( Router ) が着信メッセージをリッスンする抽象化に形式化したいと思いますが、接続されたクライアント ( Dealers )にオンデマンドでメッセージをブロードキャストする必要もあります。

サブスクライバーがサーバーにもメッセージを送信する必要があるため、Pub <--> Sub ソケットの使用を避けようとしています。私が達成しようとしていることに最も近いパターンは、WebSocket クライアント サーバー通信です。

クライアント メッセージをリッスンする最初の部分は、次のように行われます。

using (var server = new RouterSocket("@tcp://*:80"))
{
    var addresses = new HashSet<string>();
    while (true)
    {
        var msg = server.ReceiveMultipartMessage();

        var address = Encoding.UTF8.GetString(msg[0].Buffer);
        var payload = Encoding.UTF8.GetString(msg[2].Buffer);
        Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload);

        var contains = addresses.Contains(address);
        if (!contains) { addresses.Add(address); }            

        msg.Clear();
        msg.Append(address);
        msg.AppendEmptyFrame();
        msg.Append("Reply for: " + address);
        server.SendMultipartMessage(msg);
    }
}

ソケットがスレッドセーフではないことを考えると、すべてのクライアントに (オンデマンドで別のスレッドからの) メッセージをブロードキャストする方法を見つけることに行き詰まっています。

代わりにタイムアウトを設定してループ内でメソッドを使用するTryReceiveMultipartMessageと、ブロードキャスト メッセージのキューをチェックし、そのようなメッセージを送信する各クライアントをループできます。何かのようなもの:

using (var server = new RouterSocket("@tcp://*:80"))
{
    var addresses = new HashSet<string>();

    var msg = new NetMQMessage();
    while (true)
    {
        var clientHasMsg = server.TryReceiveMultipartMessage(TimeSpan.FromSeconds(1), ref msg);
        if (!clientHasMsg)
        {
            // Check any incoming broacast then loop through all the clients
            // sending each the brodcast msg
            var broadMsg = new NetMQMessage();
            foreach (var item in addresses)
            {
                broadMsg.Append(item);
                broadMsg.AppendEmptyFrame();
                broadMsg.Append("This is a broadcast");
                server.SendMultipartMessage(broadMsg);
                broadMsg.Clear();
            }

            // Go back into the loop waiting for client messages
            continue;
        }

        var address = Encoding.UTF8.GetString(msg[0].Buffer);
        var payload = Encoding.UTF8.GetString(msg[2].Buffer);
        Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload);

        var contains = addresses.Contains(address);
        if (!contains) { addresses.Add(address); }

        msg.Clear();
        msg.Append(address);
        msg.AppendEmptyFrame();
        msg.Append("Reply for: " + address);
        server.SendMultipartMessage(msg);
    }
}

これは、主に次の理由により、どういうわけか正しく感じられません。

  • タイムアウトの適切な値はどれですか? 1 秒、100 ミリ秒など。
  • このプログラムは、毎秒数千のメッセージを送信する 10 万以上のクライアントを接続するために使用されるため、これが最も効率的でパフォーマンスの高いソリューションですか。

これに対する最善のアプローチについての指針は非常に高く評価されています。

4

2 に答える 2