私は次のパターンを持っています:
複数のスレッドが、メッセージをRouterに送信するために単一のスレッド化されたDealerによってポーリングされる ConcurrentQueue にメッセージを送信しています。
複数のメッセージが送信されると、次の例外が発生します。
「SocketException - 非ブロッキング ソケット操作を完了できませんでした」
メッセージをデキューしてディーラーに送信するスレッドのコードは次のとおりです。
Task.Factory.StartNew((state) =>
{
using (NetMQSocket dealerSocket = new DealerSocket(_connectionString))
using (NetMQPoller poller = new NetMQPoller() { dealerSocket })
{
dealerSocket.ReceiveReady += DealerSocketOnReceiveReady;
poller.RunAsync();
while (true)
{
Message<T> message;
if (!_concurrentQueue.TryDequeue(out message)) continue;
_pendingRequests.Add(message.Id, message);
var mpm = new NetMQMessage(4);
mpm.AppendEmptyFrame();
mpm.Append(message.Body);
mpm.AppendEmptyFrame();
mpm.Append(message.Id.ToString());
dealerSocket.SendMultipartMessage(mpm);
}
}
}, TaskCreationOptions.LongRunning, _cancellationTokenSource.Token);
MultipartMessage の送信時に SocketException が発生しました
SendBuffer のサイズや SendHighWatermark のサイズを大きくしようとしましたが、それでも同じエラーが発生します。
この例外を処理してソケットをリセットする必要がありますか?