masstransit の webapi コントローラーを構成しました。このようなもの 。
_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(x =>
{
x.Host(new Uri("rabbitmq://localhost/"), h =>{ });
});
_busControl.Start();
コンソールアプリケーションは、私のコマンド/イベントをリッスンしている私のサーバーです-
var BusControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
//h.Username("guest");
//h.Password("guest");
});
cfg.ReceiveEndpoint(host, "create_product_queue", config => config.Consumer<CreateInventoryProductItemHandler>());
cfg.ReceiveEndpoint(host, "ProductItemCreatedEvent", config => config.Handler<ProductItemCreatedEvent>(async context => await Console.Out.WriteLineAsync($"Product Item Id -----{context.Message.ProductItemId} "))); <---- this never gets executed .
});
BusControl.Start();
そして、コントローラーのアクションは次のようなものです-
[HttpPost("product")]
public async Task<IActionResult> Post([FromBody] Product Product)
{
var endpoint = await Startup.Bus.GetSendEndpoint(new Uri("rabbitmq://localhost/create_product_queue"));
await endpoint.Send<CreateInventoryProductItem>(new
{
ProductName = Product.ProductName,
PartNumber = Product.PartNumber,
ManufactureDate = Product.ManufacturedDate
});
return new HttpStatusCodeResult(StatusCodes.Status202Accepted);
}
そしてハンドラは -
public async Task Consume(ConsumeContext<CreateInventoryProductItem> context)
{
CreateInventoryProductItem Command = context.Message;
Product Product = await Context.SaveAsync(new Product(Command.ProductName
, Command.PartNumber
, Command.ManufacturedDate));
await context.Publish<ProductItemCreatedEvent>(Product.Id); <--- problem area
}
今私のシナリオはです。
コマンド オブジェクトを作成します。
Context.SaveAsync()
新たに作成したEntityフレームワークのメソッドからプロダクトIDを取得します。ProductId をバスに発行して、他のリスナーがこの製品 ID をリッスンし、それに応じて動作できるようにします。
この場合に適用する最適なパターンを見つけようとしています。
問題は -- await context.Publish<ProductItemCreatedEvent>(Product.Id);
この行から始まります。同じキューに無限に何度も公開しようとします。その結果、同じ Product オブジェクトが何百回も挿入されます (一意の制約が発生しないことはかなり不自由ですが、ここでは問題ではありません)。この状況を取り除く方法。私は大量輸送の概念にかなり慣れていません。
なぜこれが起こっているのか、誰かが光を当てることができます。