0

下の画像に示されている.NetCoreにマルチスレッドのKafkaコンシューマを実装したいと考えています。Java では、Poll メソッドを使用して複数のレコードを取得し、各パーティションのレコードを取得できます。私は.netで同じことを実装したい.

ここに画像の説明を入力

Java の実装。

while (!stopped.get()) {
     ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
     records.partitions().forEach(partition -> {
         List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
         Task task = new Task(partitionRecords);
         executor.submit(task);
         activeTasks.put(partition, task);
    });
}

.net コアのコンシューマー コード。

        using var consumer = new ConsumerBuilder<string, string>(
            configuration.AsEnumerable())
            .SetErrorHandler((_, eb) => Console.WriteLine($"Error: {eb.Reason}"))
            .SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}"))
            .SetPartitionsAssignedHandler((c, partitions) =>
            {
                Console.WriteLine($"Assigned Partitions: [{string.Join(", ", partitions)}]");
            })
            .SetPartitionsRevokedHandler((c, partitions) =>
            {
                Console.WriteLine($"Revoking Assignment: [{string.Join(", ", partitions)}]");
            })
            .Build();
        consumer.Subscribe(topicList);
        try
        {
            while (true)
            {
                var cr = consumer.Consume(cts.Token);
                Console.WriteLine($"Consumed event from topic {cr.Topic} with value {cr.Message.Value}. Offset: {cr.Offset}, Partition: {cr.Partition.Value}");
            }
        }
        catch (OperationCanceledException)
        {
            // Ctrl-C was pressed.
        }
        finally
        {
            consumer.Close();
        }
4

0 に答える 0