0

トピック交換を使用して、次の機能を備えたパブリッシュ/サブスクライブ メッセージング パターンが必要です。

  1. 発行元確認」を実装してもらいます。
  2. 処理が完了したら、コンシューマにも各メッセージを確認してもらいます。
  3. ルーティング キーを使用して、メッセージを 1 つ以上のコンシューマーにルーティングします。
  4. 永続的なコンシューマー キューを用意して、コンシューマー アプリケーションが一時的にダウンした場合に、回復したときにそのキューからメッセージを取得できるようにします。

そこで、上記をテストするために 2 つのコンソール アプリケーション (送信と受信) を作成しました。

送信

    static void Main(string[] args)
    {

        Console.WriteLine(" Type [exit] to exit.");

        Publisher publisher = new Publisher();

        do
        {
            var userInput = Console.ReadLine();
            if (userInput == "exit")
            {
                break;
            }


            publisher.SendMessageToBroker("localhost", "main", "user.update", userInput);

        } while (true);
    }

出版社

public class Publisher
{
    const string ExchangeType = "topic";

    Dictionary<ulong, string> unConfirmedMessageTags = new Dictionary<ulong, string>();

    public void SendMessageToBroker(string host, string exchangeName, string routingKey, string message)
    {

        var factory = new ConnectionFactory() { HostName = host };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {                
            channel.BasicAcks += (sender, ea) => OnBasicAcks(ea.Multiple, ea.DeliveryTag);
            channel.BasicNacks += (sender, ea) => OnBasicNacks(ea.Multiple, ea.DeliveryTag);

            channel.ConfirmSelect();

            channel.ExchangeDeclare(exchangeName, ExchangeType);

            var body = Encoding.UTF8.GetBytes(message);

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            unConfirmedMessageTags.TryAdd(channel.NextPublishSeqNo, message);

            channel.BasicPublish(exchange: exchangeName,
                routingKey: routingKey,
                basicProperties: properties,
                body: body);

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }

    private void OnBasicNacks(bool multiple, ulong deliveryTag)
    {
        if (multiple)
        {
            Console.WriteLine("Messages with delivery tag LESS THAN {0} have been LOST and must be resent.", deliveryTag);
        }
        else
        {
            Console.WriteLine("Message with delivery tag {0} has been LOST and must be resent.", deliveryTag);
        }
    }

    private void OnBasicAcks(bool multiple, ulong deliveryTag)
    {
        if (multiple)
        {
            var confirmed = unConfirmedMessageTags.Where(k => k.Key <= deliveryTag);
            foreach (var entry in confirmed)
            {
                unConfirmedMessageTags.Remove(entry.Key);
                Console.WriteLine("Message with delivery tag {0} has been confirmed and deleted.", entry.Key);
            }

        }
        else
        {
            unConfirmedMessageTags.Remove(deliveryTag);
            Console.WriteLine("Message with delivery tag {0} has been confirmed and deleted.", deliveryTag);
        }

    }
}

}

受け取る

    static void Main(string[] args)
    {
        const string ExchangeName = "main";
        const string QueueName = "q1";
        const string ExchangeType = "topic";
        const string RoutingKey = "user.update";

        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(ExchangeName, ExchangeType);

            channel.QueueDeclare(queue: QueueName, 
                durable: true, 
                autoDelete: false, 
                exclusive: false, 
                arguments: null);

            channel.QueueBind(QueueName, ExchangeName, RoutingKey);

            //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            var consumer = new EventingBasicConsumer(channel);

            consumer.Received += (model, ea) => Basic_Ack(channel, ea.DeliveryTag, ea.Body);

            channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }            
    }

    private static void Basic_Ack(IModel channel, ulong deliveryTag, ReadOnlyMemory<byte> body)
    {            
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine(" [x] Received {0}", message);

        Thread.Sleep(2000);            

        channel.BasicAck(deliveryTag: deliveryTag, multiple: false);

        Console.WriteLine(" [x] Processed {0}", message);
    }
}

問題は、送信プログラムのOnBasicAcksが最初のメッセージに対して 1 回しか呼び出されないことです。

ここに画像の説明を入力

4

1 に答える 1