2

一部のパブ/サブ (パブリッシャ/サブスクライバ) コードに .Net RabbitMQ を使用しています。消費者の閉鎖を開始するまで、すべてが正常に機能します。最後のコンシューマーを閉じるまで、コンシューマーは公開されたデータを適切に処理します。すべてのコンシューマーの後、新しいコンシューマーを開きますが、何も起こりません。アプリケーションは開きますが、パブリッシャーからデータを受け取りません。

パブリッシャーのコードを調べたところ、最後のコンシューマーが閉じると、そのチャネルのIsOpenプロパティが false になることがわかりました。コンシューマーが閉じられた後でもチャネルを開いたままにする設定があるかどうかはわかりません。

これが私の発行者コードです: 編集最初に間違ったコードを貼り付けました。

そして、ここに私の消費者コードがあります:

public MyConsumer
{
private readonly ConnectionFactory _factory;
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly Timer _timer;

private SubscriptionConsumerType(string ipAddress, string exchangeName, TimeSpan tsPullCycle)
{
    //set up connection
    this._factory = new ConnectionFactory();
    this._factory.HostName = ipAddress;
    this._connection = this._factory.CreateConnection();
    this._channel = this._connection.CreateModel();

    //set up and bind the exchange
    this._channel.ExchangeDeclare(exchangeName, "fanout", false, true, new Dictionary<string, object>());
    string queueName = this._channel.QueueDeclare().QueueName;
    this._channel.QueueBind(queueName, exchangeName, "");

    //start consuming
    QueueingBasicConsumer consumer = new QueueingBasicConsumer(this._channel);
    this._channel.BasicConsume(queueName, true, consumer);

    //periodically check for new messages from the publisher
    this._timer = new Timer(new TimerCallback(this.TimerStep), consumer, tsPullCycle, tsPullCycle);
}

public void Dispose()
{
    if (this._timer != null)
        this._timer.Dispose();

    if (this._channel != null)
    {
        this._channel.Close();
        this._channel.Dispose();
    }

    if (this._connection != null)
    {
        this._connection.Close();
        this._connection.Dispose();
    }
}
}

現在、これに対する私の回避策は、常にどこかで消費者ウィンドウを開いておくことです。ただし、理想的には、開いているコンシューマー ウィンドウの数に関係なく、パブリッシャーを実行したいと考えています。ありがとう。

編集おっと、間違ったプロデューサー コードを貼り付けました。これがそれです:

private SubscriptionBroadcastType(string ipAddress, string exchangeName)
{
    this._factory = new ConnectionFactory();
    this._factory.HostName = ipAddress;
    this._connection = this._factory.CreateConnection();
    this._channel = this._connection.CreateModel();

    this._exchangeName = exchangeName;
    this._channel.ExchangeDeclare(exchangeName, SubscriptionBroadcastType.BROADCAST, SubscriptionBroadcastType.DURABLE, SubscriptionBroadcastType.AUTO_DELETE, new Dictionary<string, object>());
}

public void BroadcastMessage(string message)
{
    lock (this._syncroot) //protect _channel
    {
        if (this._channel.IsOpen)
            this._channel.BasicPublish(this._exchangeName, "", null, System.Text.Encoding.UTF8.GetBytes(message));
    }
}
4

1 に答える 1

2

ここで根本的に何かがおかしいのではないかと思います。正しいコードを公開していることを確認してください。私が読んだように、あなたは特定の名前付きキューを作成し、キューに直接公開するプロデューサーを持っています。特定の名前付きエクスチェンジを作成し、動的に名前が付けられた新しいキューを作成し、それをエクスチェンジにバインドするコンシューマーがあります。次に、このキューから読み取りますが、最初に公開したキューにすることはできません。

最初にコードを修正して、コンシューマーコードでアクセスできる特定の名前でパブリッシャーコードに交換を作成します。この行は、キュー宣言行の代わりにプロデューサースレッドに表示されます。

this._channel.ExchangeDeclare(exchangeName, "fanout", false, true, new Dictionary<string, object>());

次に、その取引所に公開する必要があるため、キューに公開する行の代わりに、次のように変更します。

this._channel.BasicPublish(exchangeName, "", this._basicProperties, System.Text.Encoding.UTF8.GetBytes(message));

コンシューマーは、キューからこれらのメッセージをそのまま受信するように正しく設定する必要があります。それがあなたの問題に役立つかどうか見てください。

于 2012-06-04T14:06:15.350 に答える