10

私はZeroMQを試し、何かを機能させようとしています。私が最初に考えたのは、inprocトランスポートを使用してREP / REQを設定し、2つのスレッド間でメッセージを送信できるかどうかを確認することでした。以下のコードのほとんどはclzmqの例から取られていますが、機能していないようです。

サーバーとクライアントの両方がトランスポートにバインドされていますが、クライアントがそれを実行しようとすると、Sendブロックされてそこにとどまります。私はZeroMQの経験がないので、最初にどこを見ればよいかわかりません。助けていただければ幸いです。問題のある(攻撃的な)コードは次のとおりです。

using System;
using System.Diagnostics;
using System.Threading;
using NUnit.Framework;
using ZMQ;

namespace PostBox
{
    [TestFixture]
    public class Class1
    {

        private const string Address = "inproc://test";
        private const uint MessageSize = 10;
        private const int RoundtripCount = 100;

        [Test]
        public void Should()
        {
            var clientThread = new Thread(StartClient);
            clientThread.Start();

            var serverThread = new Thread(StartServer);
            serverThread.Start();

            clientThread.Join();
            serverThread.Join();

            Console.WriteLine("Done with life");
        }

        private void StartServer()
        {


            //  Initialise 0MQ infrastructure
            using (var ctx = new Context(1))
            {
                using (var skt = ctx.Socket(SocketType.REP))
                {
                    skt.Bind(Address);

                    Console.WriteLine("Server has bound");

                    //  Bounce the messages.
                    for (var i = 0; i < RoundtripCount; i++)
                    {
                        var msg = skt.Recv();
                        Debug.Assert(msg.Length == MessageSize);
                        skt.Send(msg);
                    }
                    Thread.Sleep(1000);
                }
            }

            Console.WriteLine("Done with server");
        }

        private void StartClient()
        {
            Thread.Sleep(2000);

            //  Initialise 0MQ infrastructure
            using (var ctx = new Context(1))
            {
                using (var skt = ctx.Socket(SocketType.REQ))
                {
                    skt.Bind(Address);

                    Console.WriteLine("Client has bound");

                    //  Create a message to send.
                    var msg = new byte[MessageSize];

                    //  Start measuring the time.
                    var watch = new Stopwatch();
                    watch.Start();

                    //  Start sending messages.
                    for (var i = 0; i < RoundtripCount; i++)
                    {
                        skt.Send(msg);
                        msg = skt.Recv();
                        Debug.Assert(msg.Length == MessageSize);

                        Console.Write(".");
                    }

                    //  Stop measuring the time.
                    watch.Stop();
                    var elapsedTime = watch.ElapsedTicks;

                    //  Print out the test parameters.
                    Console.WriteLine("message size: " + MessageSize + " [B]");
                    Console.WriteLine("roundtrip count: " + RoundtripCount);

                    //  Compute and print out the latency.
                    var latency = (double)(elapsedTime) / RoundtripCount / 2 *
                        1000000 / Stopwatch.Frequency;
                    Console.WriteLine("Your average latency is {0} [us]",
                        latency.ToString("f2"));
                }
            }

            Console.WriteLine("Done with client");
        }

    }
}

編集:

私は以下の答えの助けを借りてこれを機能させましたが、ローカルトランスポートにバインドするサーバーとリモートトランスポートに接続するクライアントがBindあるConnectので、これを考えると意味があります。更新されたコードは次のとおりです。

using System;
using System.Diagnostics;
using System.Threading;
using NUnit.Framework;
using ZMQ;

namespace PostBox
{
    [TestFixture]
    public class Class1
    {

        private const string Address = "inproc://test";
        private const uint MessageSize = 10;
        private const int RoundtripCount = 100;

        private static Context ctx;

        [Test]
        public void Should()
        {
            using (ctx = new Context(1))
            {
                var clientThread = new Thread(StartClient);
                clientThread.Start();

                var serverThread = new Thread(StartServer);
                serverThread.Start();

                clientThread.Join();
                serverThread.Join();

                Console.WriteLine("Done with life");
            }
        }

        private void StartServer()
        {
            try
            {
                using (var skt = ctx.Socket(SocketType.REP))
                {
                    skt.Bind(Address);

                    Console.WriteLine("Server has bound");

                    //  Bounce the messages.
                    for (var i = 0; i < RoundtripCount; i++)
                    {
                        var msg = skt.Recv();
                        Debug.Assert(msg.Length == MessageSize);
                        skt.Send(msg);
                    }
                    Thread.Sleep(1000);
                }

                Console.WriteLine("Done with server");
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }

        private void StartClient()
        {
            Thread.Sleep(2000);

            try
            {
                //  Initialise 0MQ infrastructure
                using (var skt = ctx.Socket(SocketType.REQ))
                {
                    skt.Connect(Address);

                    Console.WriteLine("Client has bound");

                    //  Create a message to send.
                    var msg = new byte[MessageSize];

                    //  Start measuring the time.
                    var watch = new Stopwatch();
                    watch.Start();

                    //  Start sending messages.
                    for (var i = 0; i < RoundtripCount; i++)
                    {
                        skt.Send(msg);
                        msg = skt.Recv();
                        Debug.Assert(msg.Length == MessageSize);

                        Console.Write(".");
                    }

                    //  Stop measuring the time.
                    watch.Stop();
                    var elapsedTime = watch.ElapsedTicks;

                    //  Print out the test parameters.
                    Console.WriteLine("message size: " + MessageSize + " [B]");
                    Console.WriteLine("roundtrip count: " + RoundtripCount);

                    //  Compute and print out the latency.
                    var latency = (double)(elapsedTime) / RoundtripCount / 2 *
                                  1000000 / Stopwatch.Frequency;
                    Console.WriteLine("Your average latency is {0} [us]",
                                      latency.ToString("f2"));
                }

                Console.WriteLine("Done with client");
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }

    }
}
4

2 に答える 2

14

両方のスレッドが同じコンテキストを使用する必要があると思います。Zeromqガイドでは、プロセスで複数のコンテキストを使用しないことを推奨しています。コンテキストを作成し、そのコンテキストを両方のスレッド間で共有します。これは機能するはずです。

http://zguide.zeromq.org/chapter:allから

プロセスの「コンテキスト」オブジェクトを作成し、それをすべてのスレッドに渡す必要があります。コンテキストはØMQの状態を収集します。inproc:トランスポートを介して接続を作成するには、サーバースレッドとクライアントスレッドの両方が同じコンテキストオブジェクトを共有する必要があります。

于 2010-12-07T07:01:27.623 に答える
2

一方の端だけがもう一方の端をバインドできます。接続する必要があります。複数の接続を持つことができます。

于 2013-03-09T01:21:56.267 に答える