1

akka ストリーム バッファを使用してストリームのスループットを向上させようとしていますが、それが Kafka にどのように適用されるのか疑問に思っています

Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))

特に、

 val kafkaSource =
   Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
     .buffer(10000, OverflowStrategy.backpressure)

基礎となる Kafka API に関して、ここで正確に何が起こるのでしょうか?

基礎となる Kafka クライアントに次の構成があります。

.withProperty(AUTO_OFFSET_RESET_CONFIG, offsetReset)
      .withProperty(MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs.toString)
      .withProperty(SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs.toString)
      .withProperty(HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMs.toString)
      .withProperty(FETCH_MAX_WAIT_MS_CONFIG, fetchMaxWaitMs.toString)
      .withProperty(MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
      .withProperty(FETCH_MAX_BYTES_CONFIG, maxPollRecords.toString)
      .withProperty(MAX_PARTITION_FETCH_BYTES_CONFIG, maxPollRecords.toString)

したがってMAX_POLL_RECORDS_CONFIG、私は を持っFETCH_MAX_BYTES_CONFIGていますMAX_PARTITION_FETCH_BYTES_CONFIG

私が疑問に思っているのは、基になるクライアントで構成されたフェッチに関してバッファーがどのように再生されるかということです。

  1. Consumer.committableSource独自のアクターで具体化され、そのバッファーを介して、基礎となる Kafka クライアントからメッセージを受信しますか? 基になるクライアントが最大 100 万のメッセージを取得するように構成されており、Actor が1000?のバッファーとして設定されているとします。それはどう言う意味ですか?どうなりますか?アクター バッファーは、Kafka クライアントのポーリング リクエストをオーバーライドしますか? それとも、ポーリングの結果 (基になるクライアントで構成された最大値) が通過するまで、Kafka クライアントがプッシュするメール ボックスにデータを取得しますか?

Kafka ストリームの内部バッファまたは明示的バッファがポーリング リクエストの設定とどのように相互作用するかを知る必要があると思います。

4

0 に答える 0