12

私はMassTransitを初めて使用しますが、理解できないことがあります。

すべてのノードが同じ仕事をすることができるサーバーファームがあるとしましょう。アプリケーションフレームワークはCQRSのスタイルです。つまり、公開する2つの基本的な種類のメッセージがあります。

  • コマンド:サーバーの1つだけで処理する必要があります(最初のサーバーはジョブスロットが空いています)
  • イベント:すべてのサーバーで処理する必要があります

非常に単純なMassTransitプロトタイプ(X秒ごとにhelloを送信するコンソールアプリケーション)を作成しました。

APIには、「公開」メソッドがあることがわかります。メッセージの種類(1台のサーバーとすべてのサーバー)を指定するにはどうすればよいですか?

「ハンドラー」構成を見ると、キューuriを指定できます。すべてのホストに同じキューを指定すると、すべてのホストにメッセージが表示されますが、実行を1つのサーバーのみに制限することはできません。

ホストの専用キューからリッスンすると、1つのサーバーだけがメッセージを処理しますが、他の種類のメッセージをブロードキャストする方法がわかりません。

私が欠けているものを理解するのを手伝ってください。

PS:気になるなら、私のメッセージングシステムはrabbitmqです。

テストするために、私はこのクラスで共通のクラスライブラリを作成しました:

public static class ActualProgram
{
    private static readonly CancellationTokenSource g_Shutdown = new CancellationTokenSource();

    private static readonly Random g_Random = new Random();

    public static void ActualMain(int delay, int instanceName)
    {
        Thread.Sleep(delay);
        SetupBus(instanceName);

        Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);

        Console.WriteLine("Press enter at any time to exit");
        Console.ReadLine();
        g_Shutdown.Cancel();

        Bus.Shutdown();
    }

    private static void PublishRandomMessage()
    {
        Bus.Instance.Publish(new Message
        {
            Id = g_Random.Next(),
            Body = "Some message",
            Sender = Assembly.GetEntryAssembly().GetName().Name
        });

        if (!g_Shutdown.IsCancellationRequested)
        {
            Thread.Sleep(g_Random.Next(500, 10000));
            Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);
        }
    }

    private static void SetupBus(int instanceName)
    {
        Bus.Initialize(sbc =>
        {
            sbc.UseRabbitMqRouting();
            sbc.ReceiveFrom("rabbitmq://localhost/simple" + instanceName);
            sbc.Subscribe(subs =>
            {
                subs.Handler<Message>(MessageHandled);
            });
        });
    }

    private static void MessageHandled(Message msg)
    {
        ConsoleColor color = ConsoleColor.Red;
        switch (msg.Sender)
        {
            case "test_app1":
                color = ConsoleColor.Green;
                break;

            case "test_app2":
                color = ConsoleColor.Blue;
                break;

            case "test_app3":
                color = ConsoleColor.Yellow;
                break;
        }
        Console.ForegroundColor = color;
        Console.WriteLine(msg.ToString());
        Console.ResetColor();
    }

    private static void MessageConsumed(Message msg)
    {
        Console.WriteLine(msg.ToString());
    }
}

public class Message
{
    public long Id { get; set; }

    public string Sender { get; set; }

    public string Body { get; set; }

    public override string ToString()
    {
        return string.Format("[{0}] {1} : {2}" + Environment.NewLine, Id, Sender, Body);
    }
}

また、ActualMainメソッドを実行するコンソールアプリケーションも3つあります。

internal class Program
{
    private static void Main(string[] args)
    {
        ActualProgram.ActualMain(0, 1);
    }
}
4

1 に答える 1

10

必要なのは競合するコンシューマーとして知られています(詳細については、SOを検索してください)RabbitMQを使用すると、作業が簡単になります。開始する各コンシューマーに同じキュー名を指定するだけで、メッセージはによってのみ処理されます。それらの中の一つ。毎回一意のキューを生成する代わりに。

private static void SetupBus(int instanceName)
{
    Bus.Initialize(sbc =>
    {
        sbc.UseRabbitMqRouting();
        sbc.ReceiveFrom("rabbitmq://localhost/Commands);
        sbc.Subscribe(subs =>
        {
            subs.Handler<Message>(MessageHandled);
        });
    });
}

AFAIK、イベントハンドラーではなく、コマンドハンドラー用に別のプロセスが必要になります。すべてのコマンドハンドラーはReceiveFrom同じキューになり、すべてのイベントハンドラーはReceiveFrom独自のキューになります。

パズルのもう1つのピースは、メッセージをバスに取り込む方法です。コマンドにpublishを使用することもできますが、コンシューマーを誤って構成した場合、メッセージがすべてのコンシューマーに送信されるため、複数の実行が発生する可能性があります。メッセージが単一のキューに収まるようにする場合Sendは、ではなく使用できますPublish

     Bus.Instance
         .GetEndpoint(new Uri("rabbitmq://localhost/Commands"))
        .Send(new Message
        {
            Id = g_Random.Next(),
            Body = "Some message",
            Sender = Assembly.GetEntryAssembly().GetName().Name
        });
于 2012-08-10T12:01:17.077 に答える