4

同じエンドポイント名を使用してパブリッシャーとコンシューマーを作成すると、MassTransit がメッセージを失う状況が発生しました。

以下のコードに注意してください。コンシューマーまたはパブリッシャーのいずれかに別のエンドポイント名を使用すると (たとえば、パブリッシャーの場合は「rabbitmq://localhost/mtlossPublised」)、メッセージはパブリッシュされたものと消費されたものの両方の一致をカウントします。(サンプルのように) 同じエンドポイント名を使用すると、パブリッシュされるよりも消費されるメッセージが少なくなります。

これは予想される動作ですか?または、以下のサンプルコードを実行して、何か間違ったことをしていますか?

using MassTransit;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MTMessageLoss
{
    class Program
    {
        static void Main(string[] args)
        {
            var consumerBus = ServiceBusFactory.New(b =>
            {
                b.UseRabbitMq();
                b.UseRabbitMqRouting();
                b.ReceiveFrom("rabbitmq://localhost/mtloss");
            });
            var publisherBus = ServiceBusFactory.New(b =>
            {
                b.UseRabbitMq();
                b.UseRabbitMqRouting();
                b.ReceiveFrom("rabbitmq://localhost/mtloss");
            });
            consumerBus.SubscribeConsumer(() => new MessageConsumer());
            for (int i = 0; i < 10; i++)
                publisherBus.Publish(new SimpleMessage() { CorrelationId = Guid.NewGuid(), Message = string.Format("This is message {0}", i) });
            Console.WriteLine("Press ENTER Key to see how many you consumed");
            Console.ReadLine();
            Console.WriteLine("We consumed {0} simple messages. Press Enter to terminate the applicaion.", MessageConsumer.Count);
            Console.ReadLine();
            consumerBus.Dispose();
            publisherBus.Dispose();
        }
    }
    public interface ISimpleMessage : CorrelatedBy<Guid>
    {
        string Message { get; }
    }
    public class SimpleMessage : ISimpleMessage
    {
        public Guid CorrelationId { get; set; }
        public string Message { get; set; }
    }
    public class MessageConsumer : Consumes<ISimpleMessage>.All
    {
        public static int Count = 0;
        public void Consume(ISimpleMessage message)
        {
            System.Threading.Interlocked.Increment(ref Count);
        }
    }
}
4

2 に答える 2

4

要するに、バスのすべてのインスタンスには、読み取る独自のキューが必要です。メッセージを発行するためだけにバスが存在する場合でも。これは、MassTransit が機能するための要件にすぎません。

http://masstransit.readthedocs.org/en/master/configuration/config_api.html#basic-options - 警告を参照してください。

2 つのバス インスタンスが同じキューを共有する場合、動作は未定義のままにします。とにかく、それは私たちがサポートする条件ではありません. 各バス インスタンスはメタ データを他のバス インスタンスに送信することができ、独自のエンドポイントが必要です。これは MSMQ との取引がはるかに大きかったため、このケースを RabbitMQ で機能させることができるかもしれませんが、現時点ではあまり考えていません。

于 2012-09-17T23:24:46.750 に答える
1

何が起こっているのかというと、同じレシーバーURIを与えることで、MTに2つのバスの負荷分散消費を指示していますが、メッセージをリッスンしているバスは1つだけです。

受信したメッセージを追跡するために取得すると、(ほぼ)毎秒メッセージが表示されます。

サンプルコードを微調整すると、

We consumed 6 simple messages. Press Enter to terminate the applicaion.
Received 0
Received 3
Received 5
Received 6
Received 7
Received 8

他のバスで消費者を始めれば、あなたはそれらすべてを手に入れるでしょう

We consumed 10 simple messages. Press Enter to terminate the applicaion.
Received 0
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9

そうです、これは予想される動作だと思います。

これが2人のサブスクライバーによる微調整されたサンプルコードです

using MassTransit;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MTMessageLoss
{
    class Program
    {
        internal static bool[] msgReceived = new bool[10];
        static void Main(string[] args)
        {
            var consumerBus = ServiceBusFactory.New(b =>
                {
                    b.UseRabbitMq();
                    b.UseRabbitMqRouting();
                    b.ReceiveFrom("rabbitmq://localhost/mtloss");
                });
            var publisherBus = ServiceBusFactory.New(b =>
                {
                    b.UseRabbitMq();
                    b.UseRabbitMqRouting();
                    b.ReceiveFrom("rabbitmq://localhost/mtloss");
                });
            publisherBus.SubscribeConsumer(() => new MessageConsumer());
            consumerBus.SubscribeConsumer(() => new MessageConsumer());
            for (int i = 0; i < 10; i++)
                consumerBus.Publish(new SimpleMessage()
                    {CorrelationId = Guid.NewGuid(), MsgId = i});
            Console.WriteLine("Press ENTER Key to see how many you consumed");
            Console.ReadLine();
            Console.WriteLine("We consumed {0} simple messages. Press Enter to terminate the applicaion.",
                              MessageConsumer.Count);
            for (int i = 0; i < 10; i++)
                if (msgReceived[i])
                    Console.WriteLine("Received {0}", i);
            Console.ReadLine();
            consumerBus.Dispose();
            publisherBus.Dispose();

        }
    }
    public interface ISimpleMessage : CorrelatedBy<Guid>
    {
        int MsgId { get; }
    }
    public class SimpleMessage : ISimpleMessage
    {
        public Guid CorrelationId { get; set; }
        public int MsgId { get; set; }
    }
    public class MessageConsumer : Consumes<ISimpleMessage>.All
    {
        public static int Count = 0;
        public void Consume(ISimpleMessage message)
        {
            Program.msgReceived[message.MsgId] = true;
            System.Threading.Interlocked.Increment(ref Count);
        }
    }
}
于 2012-09-17T12:05:50.720 に答える