2

Kafka メトリックを監視したいのですが、残念ながら /actuator/prometheus エンドポイントの下に Kafka に関連するものは何もありません。セットアップに欠けているものはありますか?

アプリケーションの依存関係: Kotlin 1.4.31、Spring Boot 2.3.9、Spring Kafka 2.6.7、Reactor Kafka 1.2.5、Kafka Clients 2.5.1

アプリケーション構成:

    management:   
      server:
        port: 8081   
      endpoints:
        web:
          exposure:
            include: health,info,metrics,prometheus
    
    spring:
      jmx:
        enabled: true
      kafka:
        bootstrap-servers: ...
        consumer:
          group-id: my-service
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        listener:
          ack-mode: manual
        ssl:
          key-store-location: ...
          key-store-password: ...
        security:
          protocol: SSL

私の受信機は次のようになります

    @Bean
    fun someEventReceiver(): SomeEventReceiver =
        KafkaReceiver.create(
            ReceiverOptions.create<String, SomeEvent>(kafkaProperties.buildConsumerProperties())
                .withValueDeserializer(SomeEvenDeserializer())
                .subscription(listOf(serviceProperties.kafka.topics.someevent))
        )

そしてリスナー

    @EventListener(ApplicationStartedEvent::class)
    fun onSomeEvent() {
        someEventReceiver
            .receive()
            .groupBy { it.receiverOffset().topicPartition() }
            .publishOn(Schedulers.boundedElastic())
            .flatMap { someEvent ->
                someEvent
                    .publishOn(Schedulers.boundedElastic())
                    .delayUntil(::handleEvent)
                    .doOnNext { it.receiverOffset().acknowledge() }
                    .retryWhen(Retry.backoff(10, Duration.ofMillis(100)))
            }
            .retryWhen(Retry.indefinitely())
            .subscribe()
    }
4

2 に答える 2

1

@gary-russell が提案したことに従って (助けてくれてありがとう!)、私のプロジェクトでは多くのコンシューマーが存在するため、コードの量を減らすために、リスナーを構築する際に少し異なるアプローチを取りました。

    class KafkaReceiverWithMetrics<K, V>(
        private val receiver: KafkaReceiver<K, V>,
        private val consumerId: String,
        private val metricsListener: MicrometerConsumerListener<K, V>, ) : KafkaReceiver<K, V> by receiver {
        override fun receive(): Flux<ReceiverRecord<K, V>> =
            receiver.receive()
                .doOnSubscribe {
                    receiver
                        .doOnConsumer { consumer -> metricsListener.consumerAdded(consumerId, consumer) }
                        .subscribe()
                } }

そして、リスナーごとに 1 つの Bean が必要です。

    @Bean
    fun someEventReceiver(): SomeEventReceiver =
        KafkaReceiverWithMetrics(
            KafkaReceiver.create(
                ReceiverOptions.create<String, SomeEvent>(kafkaProperties.buildConsumerProperties())
                    .withValueDeserializer(SomeEventDeserializer())
                    .subscription(listOf(topics.someEvent))
            ),
            topics.someEvent,
            MicrometerConsumerListener(meterRegistry)
        )
于 2021-04-01T08:11:42.360 に答える