一部のパブ/サブ (パブリッシャ/サブスクライバ) コードに .Net RabbitMQ を使用しています。消費者の閉鎖を開始するまで、すべてが正常に機能します。最後のコンシューマーを閉じるまで、コンシューマーは公開されたデータを適切に処理します。すべてのコンシューマーの後、新しいコンシューマーを開きますが、何も起こりません。アプリケーションは開きますが、パブリッシャーからデータを受け取りません。
パブリッシャーのコードを調べたところ、最後のコンシューマーが閉じると、そのチャネルのIsOpenプロパティが false になることがわかりました。コンシューマーが閉じられた後でもチャネルを開いたままにする設定があるかどうかはわかりません。
これが私の発行者コードです: 編集最初に間違ったコードを貼り付けました。
そして、ここに私の消費者コードがあります:
public MyConsumer
{
private readonly ConnectionFactory _factory;
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly Timer _timer;
private SubscriptionConsumerType(string ipAddress, string exchangeName, TimeSpan tsPullCycle)
{
//set up connection
this._factory = new ConnectionFactory();
this._factory.HostName = ipAddress;
this._connection = this._factory.CreateConnection();
this._channel = this._connection.CreateModel();
//set up and bind the exchange
this._channel.ExchangeDeclare(exchangeName, "fanout", false, true, new Dictionary<string, object>());
string queueName = this._channel.QueueDeclare().QueueName;
this._channel.QueueBind(queueName, exchangeName, "");
//start consuming
QueueingBasicConsumer consumer = new QueueingBasicConsumer(this._channel);
this._channel.BasicConsume(queueName, true, consumer);
//periodically check for new messages from the publisher
this._timer = new Timer(new TimerCallback(this.TimerStep), consumer, tsPullCycle, tsPullCycle);
}
public void Dispose()
{
if (this._timer != null)
this._timer.Dispose();
if (this._channel != null)
{
this._channel.Close();
this._channel.Dispose();
}
if (this._connection != null)
{
this._connection.Close();
this._connection.Dispose();
}
}
}
現在、これに対する私の回避策は、常にどこかで消費者ウィンドウを開いておくことです。ただし、理想的には、開いているコンシューマー ウィンドウの数に関係なく、パブリッシャーを実行したいと考えています。ありがとう。
編集おっと、間違ったプロデューサー コードを貼り付けました。これがそれです:
private SubscriptionBroadcastType(string ipAddress, string exchangeName)
{
this._factory = new ConnectionFactory();
this._factory.HostName = ipAddress;
this._connection = this._factory.CreateConnection();
this._channel = this._connection.CreateModel();
this._exchangeName = exchangeName;
this._channel.ExchangeDeclare(exchangeName, SubscriptionBroadcastType.BROADCAST, SubscriptionBroadcastType.DURABLE, SubscriptionBroadcastType.AUTO_DELETE, new Dictionary<string, object>());
}
public void BroadcastMessage(string message)
{
lock (this._syncroot) //protect _channel
{
if (this._channel.IsOpen)
this._channel.BasicPublish(this._exchangeName, "", null, System.Text.Encoding.UTF8.GetBytes(message));
}
}