MassTransit と RabbitMQ を使用して、ステートフル サービスとして Service Fabric で実行されているサービスにメッセージを投稿する Web サイトのデモを作成しようとしています。
すべてが順調に進み、クライアントは次のようなメッセージを投稿しました。
IBusControl bus = BusConfigurator.ConfigureBus();
Uri sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}" + $"{RabbitMqConstants.PeopleServiceQueue}");
ISendEndpoint endPoint = await bus.GetSendEndpoint(sendToUri);
await endPoint.Send<ICompanyRequest>(new {CompanyId = id });
Service Fabric サービスのコンシューマーは次のように定義されました。
IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h =>
{
h.Username(RabbitMqConstants.UserName);
h.Password(RabbitMqConstants.Password);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e =>
{
e.Consumer<PersonInformationConsumer>();
});
});
busControl.Start();
これにより、クラスでメッセージを使用できるようになり、問題なく処理できます。IReliableDictonary や IReliableQueue など、サービス ファブリック サービスの RunAsync 関数から実行されるコンテキストを参照する必要があるものを使用する場合に問題が発生します。
私の質問は、MassTransit がサービス コンテキスト自体を認識しているステートフル サービス ファブリック サービス内で動作するように構成するにはどうすればよいでしょうか (それは可能ですか) ?
よろしくお願いします。マイク
更新 OK、登録ルーチンをメッセージコンシューマークラスにポイントすると、これでいくらか進歩しました(例):
ServiceRuntime.RegisterServiceAsync("ServiceType", context => new PersonInformationConsumer(context)).GetAwaiter().GetResult();
ServiceEventSource.Current.ServiceTypeRegistered(Process.GetCurrentProcess().Id, typeof(PersonInformationConsumer).Name);
次に、メッセージの消費者クラスで次のことができます。
internal sealed class PersonInformationConsumer : StatefulService, IConsumer<ICompanyRequest>
{
private static StatefulServiceContext _currentContext;
#region Constructors
public PersonInformationConsumer(StatefulServiceContext serviceContext) : base(serviceContext)
{
_currentContext = serviceContext;
}
public PersonInformationConsumer() : base(_currentContext)
{
}
これで、サービス メッセージを正常に呼び出すことができます。
ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);
私が今抱えている問題は、IReliableDictionary に何かを保存しようとすることです。これを行うと、「オブジェクト参照がオブジェクトのインスタンスに設定されていません」というエラーが発生します :( ... アイデアをいただければ幸いです (ただし、新年まで読むことはできません) !)
public async Task Consume(ConsumeContext<ICompanyRequest> context)
{
ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);
using (ITransaction tx = StateManager.CreateTransaction())
{
try
{
var myDictionary = await StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary");
これがエラーの原因です....ヘルプ!:)