下の画像に示されている.NetCoreにマルチスレッドのKafkaコンシューマを実装したいと考えています。Java では、Poll メソッドを使用して複数のレコードを取得し、各パーティションのレコードを取得できます。私は.netで同じことを実装したい.
Java の実装。
while (!stopped.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
records.partitions().forEach(partition -> {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
Task task = new Task(partitionRecords);
executor.submit(task);
activeTasks.put(partition, task);
});
}
.net コアのコンシューマー コード。
using var consumer = new ConsumerBuilder<string, string>(
configuration.AsEnumerable())
.SetErrorHandler((_, eb) => Console.WriteLine($"Error: {eb.Reason}"))
.SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}"))
.SetPartitionsAssignedHandler((c, partitions) =>
{
Console.WriteLine($"Assigned Partitions: [{string.Join(", ", partitions)}]");
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
Console.WriteLine($"Revoking Assignment: [{string.Join(", ", partitions)}]");
})
.Build();
consumer.Subscribe(topicList);
try
{
while (true)
{
var cr = consumer.Consume(cts.Token);
Console.WriteLine($"Consumed event from topic {cr.Topic} with value {cr.Message.Value}. Offset: {cr.Offset}, Partition: {cr.Partition.Value}");
}
}
catch (OperationCanceledException)
{
// Ctrl-C was pressed.
}
finally
{
consumer.Close();
}