5

私は、2 つのブローカーと 1 つの飼育係を持つカフカ環境を持っています。

Kafka にメッセージを生成しようとしているときに、ブローカー 1 (リーダーのブローカー) を停止すると、ブローカー 2 がトピックとパーティションの新しいリーダーとして選出されますが、クライアントはメッセージの生成を停止し、以下のエラーが表示されます。

org.apache.kafka.common.errors.TimeoutException: 60000 ミリ秒後にメタデータを更新できませんでした。

10 分が経過した後、ブローカー 2 は新しいリーダーであるため、プロデューサーがブローカー 2 にデータを送信することを期待していましたが、上記の例外を与えて失敗し続けました。プロデューサーのメタデータExpireMsは300000ですが、lastRefreshMsとlastSuccessfullRefreshMsは同じです。

プロデューサー側でカフカの新しいプロデューサー実装を使用しています。

プロデューサーが開始されると、1 つのブローカーにバインドされ、そのブローカーがダウンした場合、クラスター内の別のブローカーに接続しようとさえしないようです。

しかし、ブローカーがダウンした場合、利用可能な別のブローカーのメタデータを直接チェックし、それらにデータを送信する必要があります。

ところで、私のトピックは 4 パーティションで、レプリケーション ファクターは 2 です。意味がある場合に備えて、この情報を提供します。

構成パラメータ。

{request.timeout.ms=30000, retry.backoff.ms=100, buffer.memory=33554432, ssl.truststore.password=null, batch.size=16384, ssl.keymanager.algorithm=SunX509, receive.buffer.bytes=32768, ssl.cipher.suites=null, ssl.key.password=null, sasl.kerberos.ticket.renew.jitter=0.05, ssl.provider=null, sasl.kerberos.service.name=null, max.in.flight.requests.per.connection=5, sasl.kerberos.ticket.renew.window.factor=0.8, bootstrap.servers=[10.201.83.166:9500, 10.201.83.167:9500], client.id=rest-interface, max.request.size=1048576, acks=1, linger.ms=0, sasl.kerberos.kinit.cmd=/usr/bin/kinit, ssl.enabled.protocols=[TLSv1.2, TLSv1.1, TLSv1], metadata.fetch.timeout.ms=60000, ssl.endpoint.identification.algorithm=null, ssl.keystore.location=null, value.serializer=class org.apache.kafka.common.serialization.ByteArraySerializer, ssl.truststore.location=null, ssl.keystore.password=null, key.serializer=class org.apache.kafka.common.serialization.ByteArraySerializer, block.on.buffer.full=false, metrics.sample.window.ms=30000, metadata.max.age.ms=300000, security.protocol=PLAINTEXT, ssl.protocol=TLS, sasl.kerberos.min.time.before.relogin=60000, timeout.ms=30000, connections.max.idle.ms=540000, ssl.trustmanager.algorithm=PKIX, metric.reporters=[], compression.type=none, ssl.truststore.type=JKS, max.block.ms=60000, retries=0, send.buffer.bytes=131072, partitioner.class=class org.apache.kafka.clients.producer.internals.DefaultPartitioner, reconnect.backoff.ms=50, metrics.num.samples=2, ssl.keystore.type=JKS}

使用事例:

1- BR1 と BR2 プロデュース データを開始 (リーダーは BR1)

2- BR2 生成データを停止します (細かい)

3- BR1 を停止し (これは、現時点でアクティブな作業ブローカーがクラスターに存在しないことを意味します)、BR2 を開始してデータを生成します (リーダーは BR2 ですが失敗しました)。

4- BR1のデータ作成開始(リーダーはBR2のままですが、データは綺麗に作成されています)

5- BR2 を停止します (現在は BR1 がリーダーです)

6- BR1 を停止します (BR1 はまだリーダーです)

7- BR1 のデータ生成を開始します (メッセージは再び正常に生成されます)。

プロデューサーが最新の成功したデータを BR1 に送信し、その後すべてのブローカーがダウンした場合、プロデューサーは、BR2 が起動して新しいリーダーであっても、BR1 が再び起動することを期待します。これは予想される動作ですか?

4

3 に答える 3

7

何時間も費やした後、私は自分の状況でのカフカの振る舞いを理解しました。これはバグかもしれませんし、内部にある理由でこのようにする必要があるかもしれませんが、実際にそのような実装を行う場合、私はこのようにはしません:)

すべてのブローカーがダウンしたときに、1 つのブローカーしか起動できない場合、メッセージを正常に生成するには、最後にダウンしたブローカーでなければなりません。

5 つのブローカーがあるとします。BR1、BR2、BR3、BR4、BR5。すべてがダウンし、最後に停止したブローカーが BR3 (最後のリーダー) である場合、BR1、BR2、BR4、および BR5 のすべてのブローカーを開始しても、BR3 を開始しない限り意味がありません。

于 2016-03-02T09:20:08.927 に答える
0

最新のkafkaバージョンでは、ブローカーがダウンし、プロデューサーが使用するリーダーパーティションがある場合。プロデューサーは、再試行可能な例外をキャッチするまで再試行します。その後、プロデューサーはメタデータを更新する必要があります。新しいメタデータは、leastLoadNode から取得できます。したがって、新しいリーダーが更新され、プロデューサーはそこに書き込むことができます。

于 2019-10-24T10:19:47.607 に答える