2

MassTransit と RabbitMQ を使用して、ステートフル サービスとして Service Fabric で実行されているサービスにメッセージを投稿する Web サイトのデモを作成しようとしています。

すべてが順調に進み、クライアントは次のようなメッセージを投稿しました。

IBusControl bus = BusConfigurator.ConfigureBus();
Uri sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}" + $"{RabbitMqConstants.PeopleServiceQueue}");
ISendEndpoint endPoint = await bus.GetSendEndpoint(sendToUri);

await endPoint.Send<ICompanyRequest>(new {CompanyId = id });

Service Fabric サービスのコンシューマーは次のように定義されました。

        IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h =>
            {
                h.Username(RabbitMqConstants.UserName);
                h.Password(RabbitMqConstants.Password);
            });

            cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e =>
            {
                e.Consumer<PersonInformationConsumer>();
            });

        });

        busControl.Start();

これにより、クラスでメッセージを使用できるようになり、問題なく処理できます。IReliableDictonary や IReliableQueue など、サービス ファブリック サービスの RunAsync 関数から実行されるコンテキストを参照する必要があるものを使用する場合に問題が発生します。

私の質問は、MassTransit がサービス コンテキスト自体を認識しているステートフル サービス ファブリック サービス内で動作するように構成するにはどうすればよいでしょうか (それは可能ですか) ?

よろしくお願いします。マイク

更新 OK、登録ルーチンをメッセージコンシューマークラスにポイントすると、これでいくらか進歩しました(例):

ServiceRuntime.RegisterServiceAsync("ServiceType", context => new PersonInformationConsumer(context)).GetAwaiter().GetResult();
                ServiceEventSource.Current.ServiceTypeRegistered(Process.GetCurrentProcess().Id, typeof(PersonInformationConsumer).Name);

次に、メッセージの消費者クラスで次のことができます。

    internal sealed class PersonInformationConsumer : StatefulService, IConsumer<ICompanyRequest>
{
    private static StatefulServiceContext _currentContext;

    #region Constructors

    public PersonInformationConsumer(StatefulServiceContext serviceContext) : base(serviceContext)
    {
        _currentContext = serviceContext;
    }

    public PersonInformationConsumer() : base(_currentContext)
    {
    }

これで、サービス メッセージを正常に呼び出すことができます。

ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);

私が今抱えている問題は、IReliableDictionary に何かを保存しようとすることです。これを行うと、「オブジェクト参照がオブジェクトのインスタンスに設定されていません」というエラーが発生します :( ... アイデアをいただければ幸いです (ただし、新年まで読むことはできません) !)

        public async Task Consume(ConsumeContext<ICompanyRequest> context)
    {

        ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);

        using (ITransaction tx = StateManager.CreateTransaction())
        {

            try
            {

                var myDictionary = await StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary");

これがエラーの原因です....ヘルプ!:)

4

1 に答える 1

1

MassTransit とステートフル サービスを連携させるには、もう少し作業を行う必要があります。ここで気になる点がいくつかあります。

ステートフル パーティション内のマスター (n パーティション内の n マスター) のみがステートフル サービスに書き込み/更新できます。すべてのレプリカは、状態を書き戻そうとすると例外をスローします。そのため、この問題に対処する必要があります。クラスターの再調整により、マスターがクラスター内を移動できることを考慮するまでは、一見簡単に思えますが、一般的なサービス ファブリック アプリケーションのデフォルトではオフになっています。レプリカで処理を実行し、マスターでのみ作業を実行します。これはすべて RunAsync メソッドによって行われます (試してみて、RunAsync メソッドで何かおかしなことで 3 つのステートフル サービスを実行してから、マスターを終了します)。

また、データのパーティショニングについても考慮する必要があります。ステートフル サービスはパーティションによってスケーリングされるため、データをサービス バス上の個別のエンドポイントに分散する方法を作成する必要があります。おそらく、特定のパーティション範囲のみをリッスンする個別のキューを用意する必要があります。 ? メッセージがあるとしUserCreatedます。これを分割してcountry UK、パーティション 1 にUS移動し、パーティション 2 に移動するなど...

基本的なものを起動して実行したい場合は、1 つのパーティションに制限し、バスの作成を RunAsync 内に配置して、キャンセル トークンでキャンセルが要求されたらバスをシャットダウンします。

protected override async Task RunAsync(CancellationToken cancellationToken)
{
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h =>
        {
            h.Username(RabbitMqConstants.UserName);
            h.Password(RabbitMqConstants.Password);
        });

        cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e =>
        {
            // Pass in the stateful service context
            e.Consumer(c => new PersonInformationConsumer(Context));
        });

    });

    busControl.Start();

    while (true)
    {
        if(cancellationToken.CancellationRequested)
        {
            //Service Fabric wants us to stop
            busControl.Stop();

            cancellationToken.ThrowIfCancellationRequested();
        }

        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}
于 2016-12-28T23:37:45.057 に答える