私はMassTransitを初めて使用しますが、理解できないことがあります。
すべてのノードが同じ仕事をすることができるサーバーファームがあるとしましょう。アプリケーションフレームワークはCQRSのスタイルです。つまり、公開する2つの基本的な種類のメッセージがあります。
- コマンド:サーバーの1つだけで処理する必要があります(最初のサーバーはジョブスロットが空いています)
- イベント:すべてのサーバーで処理する必要があります
非常に単純なMassTransitプロトタイプ(X秒ごとにhelloを送信するコンソールアプリケーション)を作成しました。
APIには、「公開」メソッドがあることがわかります。メッセージの種類(1台のサーバーとすべてのサーバー)を指定するにはどうすればよいですか?
「ハンドラー」構成を見ると、キューuriを指定できます。すべてのホストに同じキューを指定すると、すべてのホストにメッセージが表示されますが、実行を1つのサーバーのみに制限することはできません。
ホストの専用キューからリッスンすると、1つのサーバーだけがメッセージを処理しますが、他の種類のメッセージをブロードキャストする方法がわかりません。
私が欠けているものを理解するのを手伝ってください。
PS:気になるなら、私のメッセージングシステムはrabbitmqです。
テストするために、私はこのクラスで共通のクラスライブラリを作成しました:
public static class ActualProgram
{
private static readonly CancellationTokenSource g_Shutdown = new CancellationTokenSource();
private static readonly Random g_Random = new Random();
public static void ActualMain(int delay, int instanceName)
{
Thread.Sleep(delay);
SetupBus(instanceName);
Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);
Console.WriteLine("Press enter at any time to exit");
Console.ReadLine();
g_Shutdown.Cancel();
Bus.Shutdown();
}
private static void PublishRandomMessage()
{
Bus.Instance.Publish(new Message
{
Id = g_Random.Next(),
Body = "Some message",
Sender = Assembly.GetEntryAssembly().GetName().Name
});
if (!g_Shutdown.IsCancellationRequested)
{
Thread.Sleep(g_Random.Next(500, 10000));
Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);
}
}
private static void SetupBus(int instanceName)
{
Bus.Initialize(sbc =>
{
sbc.UseRabbitMqRouting();
sbc.ReceiveFrom("rabbitmq://localhost/simple" + instanceName);
sbc.Subscribe(subs =>
{
subs.Handler<Message>(MessageHandled);
});
});
}
private static void MessageHandled(Message msg)
{
ConsoleColor color = ConsoleColor.Red;
switch (msg.Sender)
{
case "test_app1":
color = ConsoleColor.Green;
break;
case "test_app2":
color = ConsoleColor.Blue;
break;
case "test_app3":
color = ConsoleColor.Yellow;
break;
}
Console.ForegroundColor = color;
Console.WriteLine(msg.ToString());
Console.ResetColor();
}
private static void MessageConsumed(Message msg)
{
Console.WriteLine(msg.ToString());
}
}
public class Message
{
public long Id { get; set; }
public string Sender { get; set; }
public string Body { get; set; }
public override string ToString()
{
return string.Format("[{0}] {1} : {2}" + Environment.NewLine, Id, Sender, Body);
}
}
また、ActualMainメソッドを実行するコンソールアプリケーションも3つあります。
internal class Program
{
private static void Main(string[] args)
{
ActualProgram.ActualMain(0, 1);
}
}