問題タブ [confluent-cloud]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - ビーム内の KafkaIO によってカフカから読み取ることができません
次のように、Confluent Cloud 上の私の kafka クラスタからデータを読み取るために、次のように Apchea Beam で非常に単純なパイプラインを作成しました。
ただし、上記のコードを実行してカフカクラスターからデータを読み取ると、例外が発生します
私は直接 Java ランナーで上記を実行します。私はビーム 2.8 を使用しています。
上記のコードではできませんが、カフカ合流クラスターへのメッセージを読み取って生成できます。
apache-kafka - Spring Cloud Stream Kafka Stream アプリケーションは、再起動のたびにパーティション event-x のオフセットをオフセット 0 にリセットすることを示しています
トピック (イベント) から読み取り、簡単な処理を実行する Spring Cloud Stream Kafka Stream アプリケーションがあります。
このアプリケーションは、Confluent Cloud の Kafka 環境を使用しており、イベント トピックには 6 つのパーティションがあります。完全な構成は次のとおりです。
メッセージは KStream によって正しく処理されています。アプリケーションを再起動すると、再処理されません。注: 再処理されたくないので、この動作は問題ありません。
ただし、起動ログにはいくつかの奇妙なビットが表示されます。
- 最初に、リストア コンシューマ クライアントの作成が表示されます。自動オフセットリセットなし:
- 次に、自動オフセット リセットが最も早いコンシューマー クライアントを作成します。
- 起動ログの最後のトレースは、オフセットが 0 にリセットされたことを示しています。これは、アプリケーションを再起動するたびに発生します。
2 つのコンシューマーが構成されている理由は何ですか?
auto.offset.reset = earliest
明示的に構成しておらず、Kafka のデフォルトが最新であるのに、2 番目のものがあるのはなぜですか?デフォルト (auto.offset.reset = latest) の動作が必要で、正常に動作しているようです。しかし、ログに表示されている内容と矛盾していませんか?
アップデート:
3 番目の質問を次のように言い換えます。再起動のたびにパーティションが 0 にリセットされていることがログに示されているのに、メッセージが KStream に再配信されないのはなぜですか?
更新 2:
今回は、ネイティブの Kafka Streams アプリケーションを使用して、シナリオを単純化しました。この動作は、Spring Cloud Stream で観察されたものとまったく同じです。ただし、消費者グループと私が見つけたパーティションを調べると、ある程度理にかなっています。
Kストリーム:
これは私が見たものです:
1) トピックが空の場合、起動時にすべてのパーティションがオフセット 0 にリセットされます。
2) トピックに 1 つのメッセージを入れてコンシューマ グループを調べ、レコードがパーティション 4 にあることを確認します。
3) アプリケーションを再起動します。現在、リセットは空のパーティション (0、1、2、3、5) にのみ影響します。
4) 別のメッセージを挿入し、コンシューマー グループの状態を調べると、同じことが起こります。レコードはパーティション 2 にあり、アプリケーションを再起動すると、空のパーティション (0、1、3、5) のみがリセットされます。
python - AIOKafka クライアントを使用して Confluent Cloud に接続する
https://github.com/aio-libs/aiokafka/blob/master/examples/ssl_consume_produce.pyのリポジトリにあるサンプルConfluent Cloud
の修正版を使用して、Kafka クラスターに接続しようとしています。私は正しいパラメータであると信じているもので自分のとを設定しましたが、以下のランタイムエラーが発生しています:AIOKafka
ssl_consume_produce.py
AIOKafka
AIOKafkaConsumer
AIOKafkaProducer
コードの私の適応バージョンは次のとおりです。
私の難読化された構成conf
は次のようになります。
Confluent Cloud
AIOKafka クライアントを使用して接続することは可能ですか? 私の構成に間違っているものはありますか?