問題タブ [confluent-cloud]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - Confluent Cloud の過度の使用
内部テスト用に、Confluent Cloud のごく普通のインスタンスを使用しています。これはクラウドベースであるため、月が進むにつれてどのくらいのデータが通過するかに関する統計が得られます. 残念ながら、詳細な統計はありません。インスタンスへのバイト数、インスタンスからのバイト数、およびストレージのみです。そこに保存されている約 2MB のデータを転送しましたが、転送量はかなり過剰で、1 日あたり約 4GB です。多くのコンシューマーはありませんが、それらはすべて最新です。コンシューマーのいずれかがオフセット 0 などから繰り返しクエリを実行している場合、奇妙なことは何もないようです。私の質問は: これは典型的な動作ですか? 世論調査のせい?または、他の何か?
@riferrei さん、コメントありがとうございます。混乱して申し訳ありません。明確にするために、次の画像をご覧ください。
これは私が得るすべてです。私の解釈では、3 月中に少なくとも 390 KB 相当のデータを保存しましたが、それ以上ではありません (390 KB = 1024 * 1024 * 0.2766 GB-時間 / 31 日 / 24 時間)。2MB (0.0021 GB) を転送し、請求書によると、138 GB のデータ、つまり 1 日あたり約 4 GB を転送しました。私はそれがどのように起こり得るかを理解しようとしています。
datetime - コンフルエント クラウド メトリック API ISO 8601 時間間隔を理解できませんでした
ドキュメントから: https://docs.confluent.io/current/cloud/metrics-api.html
この間隔形式は何ですか? -05:00
日時で拡張される時間形式を知りませんでした2019-12-19T11:00:00
か? これは何ですか?誰かが明確にすることができますか?
前もって感謝します。
java - Kafka コンシューマーの空のレコード
このトピックについて多くの質問がありますが、これは重複した質問ではありません!
私が直面している問題は、Java 14 と Kafka 2.5.0 を使用して SpringBoot プロジェクトをセットアップしようとしたところ、Consumer がレコードの空のリストを返すことです。ここでのほとんどの回答は、頻繁にポーリングするか、オフセット モードを Earlyに設定するために、いくつかの忘れられたプロパティを示しています。
私の構成設定は型にはまらないように見えますが、 docs.confluent.ioとの論理的な違いはわかりません(以下のスニペットのjaas.confの設定を参照してください)。
ただし、これは機能します。例外 (Kafka など) は発生せず、接続が確立されます。
これが私が実際にポーリングしているところです:
問題を見つけようとするために、印刷物やその他の混乱を追加しました。プログラムをデバッグ モードで実行し、次の行にブレークポイントを配置しました。
その結果、予想どおり、1 秒ごとのカウントが出力された行が表示されます。しかし、私のコンシューマーはレコードを返さないため、決して入力せforEach
ず、ブレークポイントをトリガーしません。
私のトピックは、2 つのパーティションがあるクラウドで確実に見ることができます。メッセージは安定した流れで生成されるので、何かを拾うことができるはずです。
クラスターに接続するのに時間がかかることはわかっていますが、現在の時刻が 15 分に設定されているので、少なくとも何かを受け取るはずですよね? 別の方法として、 TopicPartition を指定し、コンシューマーを に設定consumer.subscribe()
したメソッドに を切り替えてみました。正常に実行されましたが、何も返されませんでした。consumer.assign()
consumer.seekToBeginning()
最も一般的な例に見られないもう 1 つのことは、独自のクラスを使用していることです。の代わりに、このチュートリアルKafkaConsumer<String, String>
に従ってカスタム (デ) シリアライザーを実装しました。
それは私の構成設定でしょうか?ポーリングのタイムアウトに何か問題がありますか? (デ)シリアライゼーション、または完全に別のもの? レコードがゼロになっている理由を正確に特定することはできません。どんなフィードバックでも大歓迎です!