5

Kafka 0.8.2.1 SimpleConsumer を使用しています。SimpleConsumer と FetchRequestBuilder のいくつかの構成パラメーターの意味を誰かが明確にすることはできますか? KAfka のソース コードを読んでも、ドキュメントは見つかりませんでした。(この質問をkafkaユーザーグループに投稿しようとしましたが、うまくいきませんでした):

-- Q1: SimpleConsumer コンストラクターの署名に、Int ' soTimeout'パラメーターがあります。このタイムアウトの意味は何ですか? これは Kafka ブローカーに接続するためのタイムアウトですか? [または特定の??] Kafkaへのリクエスト(FetchRequestなど)からのレスポンスを取得する際のタイムアウト? 他の何か?

kafka.javaapi.consumer.SimpleConsumer
    (val host: String,
     val port: Int,
     val soTimeout: Int,
     val bufferSize: Int,
     val clientId: String)

-- Q2: また、SimpleConsumer コンストラクターは Int の 'bufferSize' パラメーターを取ります。その意味は何ですか?これは、fetchRequest が発行されたときに SimpleConsumer が読み取るバイト数ですか? それとも、Kafka からの 1 回のフェッチごとに読み取られる最大バイト数ですか?さらに多くのデータが利用可能な場合、複数のフェッチが発生しますか?

-- FetchRequestBuilder を介して FetchRequest を構築する場合 (以下を参照)、' fetchSize 'も指定する必要があります。

FetchRequest req= newFetchRequestBuilder ()
  .clientId(kafkaGroupId)
  .addFetch(topic, partition, offset, fetchSizeInBytes)
  .build();

FetchRequestBuilder のソース コードを見ると、(私は Scala のプロではありませんが) これらの呼び出しは以下のメソッド呼び出しに変換されると思います。FetchRequest に渡される最後のパラメーターは「minBytes」と呼ばれ、これがおそらく正確なフェッチサイズ?. 少なくとも「minBytes」のデータが利用可能でない限り、何もフェッチしないということですか?

class FetchRequestBuilder():
    def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int)

    def build() = {
      val fetchRequest= FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)

FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
    correlationId: Int = FetchRequest.DefaultCorrelationId,
    clientId: String = ConsumerConfig.DefaultClientId,
    replicaId: Int = Request.OrdinaryConsumerId,
    maxWait: Int = FetchRequest.DefaultMaxWait,
    **minBytes: Int = FetchRequest.DefaultMinBytes**,
...)

だから、私の最後の質問は:

-- Q3: ' bufferSize ' と ' fetchSize/minBytes ' はどのように関連していますか? 彼らは正確に何を定義していますか?一方が他方よりも小さいか大きいかを確認する必要がありますか?

ありがとう、

マリーナ

4

1 に答える 1