問題タブ [confluent-cloud]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
830 参照

java - ビーム内の KafkaIO によってカフカから読み取ることができません

次のように、Confluent Cloud 上の私の kafka クラスタからデータを読み取るために、次のように Apchea Beam で非常に単純なパイプラインを作成しました。

ただし、上記のコードを実行してカフカクラスターからデータを読み取ると、例外が発生します

私は直接 Java ランナーで上記を実行します。私はビーム 2.8 を使用しています。

上記のコードではできませんが、カフカ合流クラスターへのメッセージを読み取って生成できます。

0 投票する
1 に答える
2393 参照

apache-kafka - Spring Cloud Stream Kafka Stream アプリケーションは、再起動のたびにパーティション event-x のオフセットをオフセット 0 にリセットすることを示しています

トピック (イベント) から読み取り、簡単な処理を実行する Spring Cloud Stream Kafka Stream アプリケーションがあります。

このアプリケーションは、Confluent Cloud の Kafka 環境を使用しており、イベント トピックには 6 つのパーティションがあります。完全な構成は次のとおりです。

メッセージは KStream によって正しく処理されています。アプリケーションを再起動すると、再処理されません。注: 再処理されたくないので、この動作は問題ありません。

ただし、起動ログにはいくつかの奇妙なビットが表示されます。

  1. 最初に、リストア コンシューマ クライアントの作成が表示されます。自動オフセットリセットなし:
  1. 次に、自動オフセット リセットが最も早いコンシューマー クライアントを作成します。
  1. 起動ログの最後のトレースは、オフセットが 0 にリセットされたことを示しています。これは、アプリケーションを再起動するたびに発生します。
  1. 2 つのコンシューマーが構成されている理由は何ですか?

  2. auto.offset.reset = earliest明示的に構成しておらず、Kafka のデフォルトが最新であるのに、2 番目のものがあるのはなぜですか?

  3. デフォルト (auto.offset.reset = latest) の動作が必要で、正常に動作しているようです。しかし、ログに表示されている内容と矛盾していませんか?

アップデート:

3 番目の質問を次のように言い換えます。再起動のたびにパーティションが 0 にリセットされていることがログに示されているのに、メッセージが KStream に再配信されないのはなぜですか?

更新 2:

今回は、ネイティブの Kafka Streams アプリケーションを使用して、シナリオを単純化しました。この動作は、Spring Cloud Stream で観察されたものとまったく同じです。ただし、消費者グループと私が見つけたパーティションを調べると、ある程度理にかなっています。

Kストリーム:

これは私が見たものです:

1) トピックが空の場合、起動時にすべてのパーティションがオフセット 0 にリセットされます。

2) トピックに 1 つのメッセージを入れてコンシューマ グループを調べ、レコードがパーティション 4 にあることを確認します。

3) アプリケーションを再起動します。現在、リセットは空のパーティション (0、1、2、3、5) にのみ影響します。

4) 別のメッセージを挿入し、コンシューマー グループの状態を調べると、同じことが起こります。レコードはパーティション 2 にあり、アプリケーションを再起動すると、空のパーティション (0、1、3、5) のみがリセットされます。

0 投票する
0 に答える
979 参照

python - AIOKafka クライアントを使用して Confluent Cloud に接続する

https://github.com/aio-libs/aiokafka/blob/master/examples/ssl_consume_produce.pyのリポジトリにあるサンプルConfluent Cloudの修正版を使用して、Kafka クラスターに接続しようとしています。私は正しいパラメータであると信じているもので自分のとを設定しましたが、以下のランタイムエラーが発生しています:AIOKafka ssl_consume_produce.pyAIOKafkaAIOKafkaConsumerAIOKafkaProducer

コードの私の適応バージョンは次のとおりです。

私の難読化された構成confは次のようになります。

Confluent CloudAIOKafka クライアントを使用して接続することは可能ですか? 私の構成に間違っているものはありますか?