3

Apache Kafka を介して統合されたサービスが多数あり、各サービスにはコンシューマーとプロデューサーがありますが、トピックに多くの負荷がかかると、何かが消費を遅くするような消費速度の低下に直面しています。

私のカフカ消費者実装の例を次に示します。

    public class Consumer : BackgroundService
    {
        private readonly KafkaConfiguration _kafkaConfiguration;
        private readonly ILogger<Consumer> _logger;
        private readonly IConsumer<Null, string> _consumer;
        private readonly IMediator _mediator;

        public Consumer(
            KafkaConfiguration kafkaConfiguration,
            ILogger<Consumer> logger,
            IServiceScopeFactory provider
        )
        {
            _logger = logger;
            _kafkaConfiguration = kafkaConfiguration;
            _mediator = provider.CreateScope().ServiceProvider.GetRequiredService<IMediator>();

            var consumerConfig = new ConsumerConfig
            {
                GroupId = "order-service",
                BootstrapServers = kafkaConfiguration.ConnectionString,
                SessionTimeoutMs = 6000,
                ConsumeResultFields = "none",
                QueuedMinMessages = 1000000,
                SecurityProtocol = SecurityProtocol.Plaintext,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnableAutoOffsetStore = false,
                FetchWaitMaxMs = 100,
                AutoCommitIntervalMs = 1000
            };

            _consumer = new ConsumerBuilder<Null, string>(consumerConfig).Build();
        }

        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            new Thread(() => StartConsumingAsync(stoppingToken)).Start();
            return Task.CompletedTask;
        }

        public async Task StartConsumingAsync(CancellationToken cancellationToken)
        {
            _consumer.Subscribe("orders");

            while (!cancellationToken.IsCancellationRequested)
            {
                try
                {
                    var consumedResult = _consumer.Consume(cancellationToken);

                    if (consumedResult == null) continue;

                    var messageAsEvent = JsonSerializer.Deserialize<OrderReceivedIntegrationEvent>(consumedResult.Message.Value);

                    await _mediator.Publish(messageAsEvent, CancellationToken.None);
                }
                catch (Exception e)
                {
                    _logger.LogCritical($"Error {e.Message}");
                }
            }
        }

これが私のプロデューサーの例です:

public class Producer
    {
        protected readonly IProducer<Null, string> Producer;

        protected Producer(string host)
        {
            var producerConfig = new ProducerConfig
            {
                BootstrapServers = host,
                Acks = Acks.Leader
            };

            Producer = new ProducerBuilder<Null, string>(producerConfig).Build();
        }

        public void Produce(InitialOrderCreatedIntegrationEvent message)
        {
            var messageSerialized = JsonSerializer.Serialize(message);
            Producer.Produce("orders", new Message<Null, string> {Value = messageSerialized});
        }
    }

ご覧のとおり、コンシューマーは kafka トピックからメッセージを読み取り、メッセージを MediatR INotification オブジェクトに逆シリアル化し、ハンドラーに発行するだけです

ハンドラーは、データベース トランザクション、redis キャッシュの読み取り/書き込み、およびプッシュ通知を操作します。

私のハンドラの例:

public override async Task Handle(OrderReceivedIntegrationEvent notification, CancellationToken cancellationToken)
        {
            try
            {
                // Get order from database
                var order = await _orderRepository.GetOrderByIdAsync(notification.OrderId.ToString());
                
                order.EditOrder(default, notification.Price);
                
                order.ChangeOrderStatus(notification.Status, notification.RejectReason);
                
                // commit the transaction
                if (await _uow.Commit())
                {
                    var cacheModificationRequest = _mapper.Map<CacheOrdersModificationRequestedIntegrationEvent>(order);
                    
                    // send mediatr notification to change cache information in Redis
                    await _bus.Publish(cacheModificationRequest, cancellationToken);
                }
            }
            catch (Exception e)
            {
                _logger.LogInformation($"Error {e.Message}");
            }
        }

しかし、15 秒のランプアップで 2000 リクエストの負荷テストを実行すると、コンシューマーが遅くなり始め、2000 リクエストすべてを消費するのに 2 ~ 5 分かかります。

MediatR レイヤーを削除して Consumer クラスでプロセスの処理を開始すると、パフォーマンスが向上するのではないかと考えていました

または、スループットを向上させる Kafka 構成がある場合は、In Sync トピックのレプリカの Ack を削除するか、しばらくしてからオフセットをコミットします。

最初に、MassTransit ライブラリを使用して kafka を実装しました。その後、この遅い消費速度を見つけた後、ライブラリを Confluet.Kafka に変更して、改善がある場合は MassTransit 抽象化レイヤーを削除しようとしましたが、それでも同じです:

<PackageReference Include="Confluent.Kafka" Version="1.7.0" />

すでに同じ問題に直面している人は誰でも私を助けることができますか?

OBS: 私の Kafka は Kubernetes の 3 つのブローカーを持つクラスターで実行されており、トピックごとに 3 つのレプリケーション ファクターを持つ 6 つのパーティションがあります。

4

0 に答える 0