NetMQ (3.3.3.4) を試して、pub-sub パターンを作成しています。
ホスト/サーバーが 1 つのポート (9000) ですべての着信データをリッスンし、別のポート (9001) ですべてのクライアント/サブスクライバーにデータを転送するようにします。
その後、クライアントは 9000 でデータを送信し、9001 で (だれからも) 送信されたすべてのメッセージを受信します。
ドキュメントに従って、以下のコードのようなものを作成しましたが、動作させることができません。ReceiveReady
主に、決して呼び出されないので、私は信じています!
それがうまくいくと私が信じている方法:
client.Publish
最初の行のhost.SubscriberSocket_ReceiveReady
ブロックを解除し、データを他のソケットに渡す必要があります- データが渡さ
Task
れると、クライアントで実行されている無限に表示されるはずです
結果:
- ブレークポイントに
// This line is never reached
到達することはありません - どこにも例外はありません。
- パブリッシュ = 9000 およびサブスクライブ = 9001 になるようにホストのポートを切り替えても効果はありません
- Windows ファイアウォールがオフになっているため、ブロックされることはありません
PublisherSocket
アドレスをコンストラクターに入れている_publisherSocket.Bind(address)
か、ホストまたは_publisherSocket.Connect(address)
クライアントで使用しているかに違いはありません
私は何を間違っていますか?
ホスト
public class MyNetMQHost {
private NetMQSocket _publishSocket;
private NetMQSocket _subscribeSocket;
private NetMQPoller _poller;
public MyNetMQHost(string publishAddress = "@tcp://localhost:9001", string subscribeAddress = "@tcp://localhost:9000") {
Task.Factory.StartNew(() => {
using (_publishSocket = new PublisherSocket(publishAddress))
using (_subscribeSocket = new SubscriberSocket(subscribeAddress))
using (_poller = new NetMQPoller { _publishSocket, _subscribeSocket }) {
_subscriberSocket.ReceiveReady += SubscriberSocket_ReceiveReady;
_poller.Run();
}
});
}
private void SubscriberSocket_ReceiveReady(object sender, NetMQSocketEventArgs e) {
var data = e.Socket.ReceiveMultipartBytes(); // This line is never reached
_publishSocket.SendMultipartBytes(data);
}
}
クライアント
public class MyNetMQClient {
private readonly NetMQSocket _publishSocket;
private readonly NetMQSocket _subscribeSocket;
public MyNetMQClient(string publishAddress = ">tcp://localhost:9000", string subscribeAddress = ">tcp://localhost:9001") {
_publishSocket = new PublisherSocket(publishAddress);
_subscribeSocket = new SubscriberSocket(subscribeAddress);
Task.Factory.StartNew(() => {
while (true) {
byte[] frameBytes = _subscribeSocket.ReceiveFrameBytes();
int one = 1; // This line is never reached
}
});
}
public void Publish(byte[] data) {
_publishSocket.SendFrame(data);
}
}
テスター
public class Tester {
public void MyTester() {
MyNetMQHost host = new MyNetMQHost();
MyNetMQClient client = new MyNetMQClient();
client.Publish(Encoding.Unicode.GetBytes("Hello world!");
}
}