1

masstransit の webapi コントローラーを構成しました。このようなもの 。

_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(x =>
            {
                x.Host(new Uri("rabbitmq://localhost/"), h =>{ });
            });
            _busControl.Start();

コンソールアプリケーションは、私のコマンド/イベントをリッスンしている私のサーバーです-

 var BusControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
             {
                 var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
                {
                    //h.Username("guest");
                    //h.Password("guest");
                });

                 cfg.ReceiveEndpoint(host, "create_product_queue", config => config.Consumer<CreateInventoryProductItemHandler>());
                 cfg.ReceiveEndpoint(host, "ProductItemCreatedEvent", config => config.Handler<ProductItemCreatedEvent>(async context => await Console.Out.WriteLineAsync($"Product Item Id -----{context.Message.ProductItemId} "))); <---- this never gets executed . 
             });
            BusControl.Start();

そして、コントローラーのアクションは次のようなものです-

[HttpPost("product")]
        public async Task<IActionResult> Post([FromBody] Product Product)
        {
            var endpoint = await Startup.Bus.GetSendEndpoint(new Uri("rabbitmq://localhost/create_product_queue"));
            await endpoint.Send<CreateInventoryProductItem>(new
            {
                ProductName = Product.ProductName,
                PartNumber = Product.PartNumber,
                ManufactureDate = Product.ManufacturedDate
            });
            return new HttpStatusCodeResult(StatusCodes.Status202Accepted);
        }

そしてハンドラは -

public async Task Consume(ConsumeContext<CreateInventoryProductItem> context)
        {
            CreateInventoryProductItem Command = context.Message;
            Product Product = await Context.SaveAsync(new Product(Command.ProductName
                , Command.PartNumber
                , Command.ManufacturedDate));
            await context.Publish<ProductItemCreatedEvent>(Product.Id); <--- problem area

        }

今私のシナリオはです。

  1. コマンド オブジェクトを作成します。

  2. Context.SaveAsync()新たに作成したEntityフレームワークのメソッドからプロダクトIDを取得します。

  3. ProductId をバスに発行して、他のリスナーがこの製品 ID をリッスンし、それに応じて動作できるようにします。

    この場合に適用する最適なパターンを見つけようとしています。

問題は -- await context.Publish<ProductItemCreatedEvent>(Product.Id);この行から始まります。同じキューに無限に何度も公開しようとします。その結果、同じ Product オブジェクトが何百回も挿入されます (一意の制約が発生しないことはかなり不自由ですが、ここでは問題ではありません)。この状況を取り除く方法。私は大量輸送の概念にかなり慣れていません。

なぜこれが起こっているのか、誰かが光を当てることができます。

4

1 に答える 1

1

問題は、Publish使用しているオーバーロードにオブジェクトを渡す必要があることです。

await context.Publish<ProductItemCreatedEvent>(Product.Id);

とのProductItemCreatedEvent2 つのプロパティが含まれている場合、匿名オブジェクトを作成してインターフェイスを初期化します。IdDescription

await context.Publish<ProductItemCreatedEvent>(new
{
    Id = Product.Id,
    Description = Product.Description,
});

このようにして、動的に生成されたイベント インターフェイスのバッキング クラスを開始できるプロパティを持つオブジェクトが渡されます。

于 2015-12-31T22:59:53.887 に答える