0

NetMQ (3.3.3.4) を試して、pub-sub パターンを作成しています。

ホスト/サーバーが 1 つのポート (9000) ですべての着信データをリッスンし、別のポート (9001) ですべてのクライアント/サブスクライバーにデータを転送するようにします。

その後、クライアントは 9000 でデータを送信し、9001 で (だれからも) 送信されたすべてのメッセージを受信します。

ドキュメントに従って、以下のコードのようなものを作成しましたが、動作させることができません。ReceiveReady主に、決して呼び出されないので、私は信じています!

それがうまくいくと私が信じている方法:

  • client.Publish最初の行のhost.SubscriberSocket_ReceiveReadyブロックを解除し、データを他のソケットに渡す必要があります
  • データが渡さTaskれると、クライアントで実行されている無限に表示されるはずです

結果:

  • ブレークポイントに// This line is never reached到達することはありません
  • どこにも例外はありません。
  • パブリッシュ = 9000 およびサブスクライブ = 9001 になるようにホストのポートを切り替えても効果はありません
  • Windows ファイアウォールがオフになっているため、ブロックされることはありません
  • PublisherSocketアドレスをコンストラクターに入れている_publisherSocket.Bind(address)か、ホストまたは_publisherSocket.Connect(address)クライアントで使用しているかに違いはありません

私は何を間違っていますか?

ホスト

public class MyNetMQHost {

    private NetMQSocket _publishSocket;
    private NetMQSocket _subscribeSocket;
    private NetMQPoller _poller;

    public MyNetMQHost(string publishAddress = "@tcp://localhost:9001", string subscribeAddress = "@tcp://localhost:9000") {
        Task.Factory.StartNew(() => {
            using (_publishSocket = new PublisherSocket(publishAddress))
            using (_subscribeSocket = new SubscriberSocket(subscribeAddress))
            using (_poller = new NetMQPoller { _publishSocket, _subscribeSocket }) {
                _subscriberSocket.ReceiveReady += SubscriberSocket_ReceiveReady;
                _poller.Run();
            }
        });
    }

    private void SubscriberSocket_ReceiveReady(object sender, NetMQSocketEventArgs e) {
        var data = e.Socket.ReceiveMultipartBytes(); // This line is never reached
        _publishSocket.SendMultipartBytes(data);
    }
}

クライアント

public class MyNetMQClient {

    private readonly NetMQSocket _publishSocket;
    private readonly NetMQSocket _subscribeSocket;

    public MyNetMQClient(string publishAddress = ">tcp://localhost:9000", string subscribeAddress = ">tcp://localhost:9001") {
        _publishSocket = new PublisherSocket(publishAddress);
        _subscribeSocket = new SubscriberSocket(subscribeAddress);

        Task.Factory.StartNew(() => {
            while (true) {
                byte[] frameBytes = _subscribeSocket.ReceiveFrameBytes();
                int one = 1; // This line is never reached
            }
        });
    }

    public void Publish(byte[] data) {
        _publishSocket.SendFrame(data);
    }
}

テスター

public class Tester {
    public void MyTester() {
        MyNetMQHost host = new MyNetMQHost();
        MyNetMQClient client = new MyNetMQClient();

        client.Publish(Encoding.Unicode.GetBytes("Hello world!");
    }
}
4

1 に答える 1