問題タブ [kafka-consumer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
812 参照

apache-kafka - Kafka で複数のコンシューマーを作成して、プロデューサーから同じメッセージを読み取り、それらを受信した後に各コンシューマーで異なるタスクを実行する方法

プロデューサーによって送信された 1 つ以上のパーティションからすべてのメッセージを読み取ることができる 3 つのコンシューマーを作成する必要があるユース ケースがあります。同じメッセージを受信した後、3 つのコンシューマーが 3 つの異なるタスクを実行する必要があります。

これを行う良い方法の 1 つは、異なる group.id を使用してコンシューマー グループを作成することです。

props.put("group.id", UUID.randomUUID().toString())

このアイデアについては、次のリンクを参照しました。

パーティションの Kafka 複数のコンシューマ

これを達成するために ConsumerGroupExample コードをどのように調整するかを知るのに行き詰まっていますか? 複数のコンシューマーを作成するにはどうすればよいですか? メッセージを受信した後、それらを個別に管理するにはどうすればよいですか? ConsumerGroupExample の複数のオブジェクトを作成する必要がありますか?

0 投票する
1 に答える
402 参照

java - Storm Toplology を使用した Kafka キューからのデータの消費

アルゴリズムは、を使用して Kafka キューからデータを読み取っていますKafkaSpout

次の例外に直面しています:

私のJavaコード:

ここで構成で間違っていることを教えてください。参考までに: AWS クラスターで Zookeeper と Storm JVM (ローカル実行) を実行しています。

0 投票する
1 に答える
227 参照

java - Storm ビルド JAR のデプロイ

  1. Kafkaキューからデータを読み取って出力するJavaクラスを開発しました

    /li>
  2. コーディング後、Maven ビルドを JAR ファイルとして実行し、jar を Amazon AWS クラスターに移動しました。

  3. そして、次のようなコマンドを実行しますnohup java -cp uber-***-0.0.1-SNAPSHOT.jar com.***.&&&.kafka.App

しかし、ここでエラーに直面しています。デプロイでどのような間違いを犯しているのか、誰か教えてもらえますか? 私は次のことを考えています:

  • このjarファイルをstrom構成フォルダーにデプロイする必要がありますか? しかし、私は瓶をAWSの別のフォルダに入れました(ストームフォルダではありません)
  • シスアウトの見方
  • プロジェクトに yml ファイルを含める必要がありますか?

以下の例外を見つけてください。

0 投票する
2 に答える
3491 参照

apache-kafka - レコード数を制限する Kafka コンシューマー

Kafka 0.9.0 コンシューマで受け取るレコード数を制限することはできますか?

0 投票する
2 に答える
12959 参照

offset - カフカ消費者オフセット最大値?

私はグーグルでKafkaのドキュメントを読んでいましたが、消費者オフセットの最大値と、最大値の後にオフセットラップアラウンドがあるかどうかを見つけることができませんでした。オフセットは Int64 値であることを理解しているため、最大値は 0xFFFFFFFFFFFFFFFF です。ラップアラウンドがある場合、Kafka はこの状況をどのように処理しますか?

0 投票する
0 に答える
559 参照

apache-kafka - kafka パーティション再割り当てツールを使用して kafka を実行した後に、「パーティションが存在しません」という警告/失敗が表示される

私はカフカ 0.8.1.1 を使用しています。3 ノードの kafka クラスターがあり、いくつかのトピックには約 5 つのパーティションがあります。クラスター内のノード数を 5 に増やし、いくつかのパーティションを既存のトピックから新しいブローカーに移動することを計画しました。

表示されるエラー メッセージ:

検索しましたが、関連する回答が見つかりませんでした。これを整理するためのガイダンス/ヘルプに感謝します。

0 投票する
1 に答える
12947 参照

kafka-consumer-api - Kafka コンシューマーがメッセージを受信しない

私はカフカの初心者です。Kafka プロデューサーと Kafka コンシューマーを作成するために、インターネットで多くの指示を読みました。Kafkaクラスターにメッセージを送信できる前者は成功しました。しかし、私は後者のもので完了しませんでした。この問題を解決するために私を助けてください。私の問題は StackOverflow のいくつかの投稿のように見えましたが、もっと明確に説明したいと思います。Virtual BoxのUbuntuサーバーでKafkaとZookeeperを実行しています。1 つの Kafka クラスターと 1 つの Zookeeper クラスターを持つ最も単純な構成 (ほぼデフォルト) を使用します。

1. プロデューサーとコンシューマーに Kafka のコマンド ラインを使用すると、次のようになります。

2.消費者向けにプロデューサーとKafkaのコマンドラインを使用すると、次のようになります。

3.Producer と Consumer を使用すると、次のようになります。

これはプロデューサーとコンシューマーの私のコードです。実際には、Kafka のいくつかの指示 Web サイトからそれらをコピーしました。

※プロデューサー番組

*消費者プログラム

*私のPOM.xmlファイル

私はあなたの助けにとても感謝しています. どうもありがとうございました。

消費者の画面。実行されているようですが、プロデューサーからのメッセージを受信できません

0 投票する
0 に答える
692 参照

spring - Kafka からの消費に失敗しました。ネストされた例外は java.util.concurrent.ExecutionException: java.lang.IllegalStateException: イテレーターが失敗した状態です

以下は私のコンシューマーとプロデューサーの XML です。トピックでメッセージを正常に送信でき、コンシューマーから読み取ることができますが、コンソールでは以下のエラーが発生します。

10823 [taskExecutor-30] エラー org.springframework.integration.handler.LoggingHandler - org.springframework.messaging.MessagingException: Kafka からの消費に失敗しました。ネストされた例外は java.util.concurrent.ExecutionException: java.lang.IllegalStateException: イテレーターが失敗した状態です

コンシューマ XML

プロデューサー XML

0 投票する
1 に答える
2094 参照

java - Kafka Java コンシューマ API の問題

Kafka Java API を使用してメッセージを消費しようとしています。kafka-console-consumer.bat を使用してメッセージを消費できます。ただし、Java API からのメッセージを消費することはできません。エラーやメッセージが表示されません。何が間違っているのか教えてください。

コンソール コマンド

作成されたトピック:

次を使用してメッセージを発行します。

コンシューマー プログラムを実行し、ブローカー コンソールでログを取得すると、次のようになります。

[2015-12-29 11:57:34,448] 情報 /IP (kafka.network.Processor) へのソケット接続を閉じています。

このログを取得するプログラムを閉じると:

java.io.IOException: 既存の接続がリモート ホストによって強制的に閉じられました

上記のプログラムからメッセージを消費できない理由を教えてください。

ただし、を使用してメッセージを消費できます

助けて。

0 投票する
0 に答える
114 参照

spring-integration - Kafka - コンシューマー グループ ID をスプリング統合の単純なコンシューマーで構成できますか?

spring-integration シンプルなコンシューマー - メッセージ駆動型チャネル オプションで kafka コンシューマー グループ ID を構成することは可能ですか?
そうでない場合、1 つのトピックがキュー (同じグループ ID のコンシューマー内) と従来のトピック (異なるグループ ID のコンシューマー間) の両方として機能する必要がある場合、これが最良の代替手段になります。