0

Confluent.Kafka を使用する .Net コア コンソール アプリケーションがあります。特定のトピックからメッセージを消費するためのコンシューマーを構築します。このアプリは、毎日数回実行され、指定されたトピックに関するメッセージを消費して処理することを目的としています。

コンシューマの動作を理解するのに少し時間がかかりましたが、グループ ID が以前に使用されたことのないものである場合にのみメッセージを消費します。コンシューマーの groupId を変更するたびに、コンシューマーはサブスクライブしたトピックのメッセージを取得します。しかし、次の実行では、同じ groupId で、consume.Consume は null を返します。

この行動は、同じグループの消費者間の再調整に関連しているようです。しかし、理由はわかりません。消費者はアプリケーションのリフトタイム全体でのみ存在する必要があるためです。アプリを終了する前に、consumer.close() と consumer.Dispose() を呼び出します。これらはコンシューマーを破棄する必要があるため、次の実行時にコンシューマーを作成すると、指定された groupId の最初の単一のコンシューマーになります。しかし、私が言ったように、これは実際に起こることではありません。

トピックに関するメッセージがあることは知っています-コマンドラインで確認します。また、トピックにパーティションが 1 つしかないことも確認しました。

最も奇妙なことは、同じプロセスを実行する別の .net コア コンソール アプリがあり、まったく問題がないことです。

2つのアプリのコードを添付します。

作業中のアプリ - 常に消費:

class Program
    {
        ...
        
        static void Main(string[] args)
        {
            if (args.Length != 2)
            {
                Console.WriteLine("Please provide topic name to read and SMTP topic name");
            }
            else
            {
                var services = new ServiceCollection();
                services.AddSingleton<ConsumerConfig, ConsumerConfig>();
                services.AddSingleton<ProducerConfig, ProducerConfig>();

                var serviceProvider = services.BuildServiceProvider();

                var cConfig = serviceProvider.GetService<ConsumerConfig>();
                var pConfig = serviceProvider.GetService<ProducerConfig>();

                cConfig.BootstrapServers = Environment.GetEnvironmentVariable("consumer_bootstrap_servers");
                cConfig.GroupId = "confluence-consumer";
                cConfig.EnableAutoCommit = true;
                cConfig.StatisticsIntervalMs = 5000;
                cConfig.SessionTimeoutMs = 6000;
                cConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
                cConfig.EnablePartitionEof = true;

                pConfig.BootstrapServers = Environment.GetEnvironmentVariable("producer_bootstrap_servers");

                var consumer = new ConsumerHelper(cConfig, args[0]);

                messages = new Dictionary<string, Dictionary<string, UserMsg>>();

                var result = consumer.ReadMessage();
                while (result != null && !result.IsPartitionEOF)
                {
                    Console.WriteLine($"Current consumed msg-json: {result.Message.Value}");

                    ...
                    
                    result = consumer.ReadMessage();
                }

                consumer.Close();
                Console.WriteLine($"Done consuming messages from topic {args[0]}");


            }

        }

クラス ConsumerHelper.cs

namespace AggregateMailing
{
    using System;
    using Confluent.Kafka;
    public class ConsumerHelper
    {
        private string _topicName;
        private ConsumerConfig _consumerConfig;
        private IConsumer<string, string> _consumer;

        public ConsumerHelper(ConsumerConfig consumerConfig, string topicName)
        {
            try
            {
                _topicName = topicName;
                _consumerConfig = consumerConfig;

                var builder = new ConsumerBuilder<string, string>(_consumerConfig);
                _consumer = builder.Build();

                _consumer.Subscribe(_topicName);
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on ConsumerHelper: {exc.ToString()}");
            }
        }

        public ConsumeResult<string, string> ReadMessage()
        {
            Console.WriteLine("ReadMessage: start");
            try
            {
                return _consumer.Consume();
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on ReadMessage: {exc.ToString()}");
                return null;
            }
        }

        public void Close()
        {
            Console.WriteLine("Close: start");
            try
            {
                _consumer.Close();
                _consumer.Dispose();
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on Close: {exc.ToString()}");
            }
        }
    }
}

動作しないアプリ - コンシューマー groupId を使用されていないものに変更した後、最初の実行時にのみ消費します:

クラス Program.cs

class Program
    {
        private static SmtpClient smtpClient;
        private static Random random = new Random();
        static void Main(string[] args)
        {
            try
            {
                var services = new ServiceCollection();
                services.AddSingleton<ConsumerConfig, ConsumerConfig>();
                services.AddSingleton<SmtpClient>(new SmtpClient("smtp.gmail.com"));

                var serviceProvider = services.BuildServiceProvider();

                var cConfig = serviceProvider.GetService<ConsumerConfig>();
                cConfig.BootstrapServers = Environment.GetEnvironmentVariable("consumer_bootstrap_servers");
                cConfig.GroupId = "smtp-consumer";
                cConfig.EnableAutoCommit = true;
                cConfig.StatisticsIntervalMs = 5000;
                cConfig.SessionTimeoutMs = 6000;
                cConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
                cConfig.EnablePartitionEof = true;


                var consumer = new ConsumerHelper(cConfig, args[0]);

                ...
                
                var result = consumer.ReadMessage();
                while (result != null && !result.IsPartitionEOF)
                {
                    Console.WriteLine($"current consumed message: {result.Message.Value}");
                    var msg = JsonConvert.DeserializeObject<EmailMsg>(result.Message.Value);

                    result = consumer.ReadMessage();
                }

                Console.WriteLine("Done sending emails consumed from SMTP topic");
                consumer.Close();
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on Main: {exc.ToString()}");
            }

        }

クラス ConsumerHelper.cs

using Confluent.Kafka;
using System;
using System.Collections.Generic;

namespace Mailer
{
    public class ConsumerHelper
    {
        private string _topicName;
        private ConsumerConfig _consumerConfig;
        private IConsumer<string, string> _consumer;
        public ConsumerHelper(ConsumerConfig consumerConfig, string topicName)
        {
            try
            {
               _topicName = topicName;
               _consumerConfig = consumerConfig;

               var builder = new ConsumerBuilder<string, string> (_consumerConfig);
               _consumer = builder.Build();

               _consumer.Subscribe(_topicName);
               //_consumer.Assign(new TopicPartition(_topicName, 0));
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on ConsumerHelper: {exc.ToString()}");
            }
        }
        public ConsumeResult<string, string> ReadMessage()
        {
            Console.WriteLine("ConsumeResult: start");
            try
            {
                
                return _consumer.Consume();
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on ConsumeResult: {exc.ToString()}");
                return null;
            }
        }
        public void Close()
        {
            Console.WriteLine("Close: start");
            try
            {
                _consumer.Close();
                _consumer.Dispose();
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on Close: {exc.ToString()}");
            }
            Console.WriteLine("Close: end");
        }

    }
}
4

0 に答える 0