問題タブ [apache-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
2734 参照

java - カフカ コンシューマ グループがハングする

メッセージを待ってハングする Kafka の ConsumerGroupExample コンシューマを実行しています

メッセージを待ってハングしているようです (たとえば、https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example、「for (final KafkaStream stream : streams)」で待機しています)。

構成の詳細: (a) VMPlayer、Ubuntu オペレーティング システムで実行。(b) Kafka バージョンの実行: 0.8.0; (c) 同じ VM (ubuntu/localhost) で Zookeeper、kafka、consumer、および Producer を実行する。(d) このバージョンの Kafka にバンドルされている Zookeeper を使用する。(e) Producer の実行例 (変更なし、cwiki.apache.org/confluence/display/KAFKA/ で入手可能); (f) 実行中のコンシューマ グループの例 (変更なし、ConsumerGroupExample)。(g) トピックのセットアップに成功しました (kafka-create-topic)

私が行ったいくつかの手順: (a) 飼育係、kafka ブローカーを繰り返し停止/開始しました。(b) コンシューマが開始する前にプロデューサがメッセージを発行していた。(c) プロデューサの前にコンシューマを実行する (そして、プロデューサにメッセージを送信させる)。(d) kafka のコンソール コンシューマー (kafka-console-consumer) が機能し、プロデューサーによって発行されたメッセージを消費できることを確認しました。(e) ログにエラー/例外がない

私が試したすべてのケースで、キューに多くのメッセージがあるという証拠にもかかわらず、コンシューマー (ConsumerGroupExample) はメッセージを待ってハングします。

見落としている構成上の問題はありますか?

どんな助けでも大歓迎です!

0 投票する
1 に答える
1748 参照

scala - Apache Kafka のビルド

Scala 2.10.1 で Kafka をビルドしようとしています。Git-hubに記載されている次の手順を試しました。最後に、ターゲット ディレクトリに Jar を生成しますが、その Jar は空で、サイズは 5kb です。ここで何か不足していますか?私はSBTにまったく慣れていません。

1) ./sbt update 2) ./sbt パッケージ 3) ./sbt assembly-package-dependency

特定のバージョンの Scala (2.8.0、2.8.2、2.9.1、2.9.2、または 2.10.1 のいずれか) 用にビルドするには、上記の手順 2 を次のように変更します。 2. ./sbt "++2.8.0 package"

0 投票する
1 に答える
9747 参照

apache-kafka - パーティションの数がコンシューマーの数を上回っている場合の Apache Kafka メッセージの消費

唯一のコンシューマー グループにコンシューマーがいるよりも多くのパーティションで Kafka クラスターを実行している場合。メッセージの順序付け、またはパーティション間でのメッセージの時間どおりの配信について保証はありますか?

簡単な例:
2 つのパーティション、1 つのコンシューマー
プロデューサーは、キーを介してパーティションの割り当てを制御しています。
メッセージ 1 が入ってパーティション A に行く
メッセージ 2 が入ってパーティション B に行く
メッセージ 3 が入ってパーティション A に行く

メッセージ 1 はメッセージ 3 の前に消費されることがわかっています。これらは同じパーティションにあるためです。しかし、メッセージ 2 はどうでしょうか。メッセージ 3 の前または後に消費されますか? または、それは異なる可能性がありますか?メッセージ 1 の前に消費される可能性はありますか?

さらに、新しいメッセージが引き続きパーティション A に着信し、生産が消費よりも速い場合はどうなるでしょうか? メッセージ 2 は無期限にパーティション B に置かれますか? いつ消費されますか?メッセージが永遠にそこに留まらないという保証はありますか?

より一般的に: コンシューマーが複数のパーティションに割り当てられている場合、そのコンシューマーはそれらのパーティション間でいつどのようにスワップしますか?

0 投票する
1 に答える
596 参照

scala - clojure で scala オブジェクトを作成する方法

kafka.utils.ZKStringSerializerclojure でscala オブジェクトを作成しようとしています。(中にありますorg.apache.kafka/kafka_2.10 "0.8.0"

私は scala についてほとんど知らないので、そのコンストラクターを呼び出す方法がわかりません。私はこのように試しました:

エラーが発生しました:CompilerException java.lang.IllegalArgumentException: No matching ctor found for class kafka.utils.ZKStringSerializer

メソッドを確認するために使用(clojure.reflect/reflect ZKStringSerializer)してみましたが、いくつかの静的メソッドしかありません。そして(class ZKStringSerializer)、それは私が望むインスタンスではなく、クラスであると教えてくれます。

オブジェクトは次のように実装されます。

0 投票する
0 に答える
242 参照

c++ - libkafka を使用して非同期のノンブロッキング メッセージを生成するにはどうすればよいですか?

このようなものを呼び出してメッセージを生成するループがあります。

sendProduceRequest をノンブロッキングにするにはどうすればよいですか?

0 投票する
1 に答える
4039 参照

apache-kafka - 圧縮された (snappy) メッセージを kafka に送信するときの UnsatisfiedLinkError

私の Java Web アプリでは、メッセージをkafkaに送信しています。

メッセージを送信する前に圧縮したいので、プロデューサーのプロパティを設定しています。

props.put("compression.codec", "2");

私が理解しているように、「2」はスナッピーの略ですが、メッセージを送信すると次のようになります。

それを解決するために、pom に snappy 依存関係を追加しようとしました:

/lib/extの下のJettyサーバーにjarを追加しますが、それでもこのエラーが発生します。

「compression.codec」プロパティに「2」ではなく「0」を設定すると、予期したとおり例外が発生しません。

snappy 圧縮を使用できるようにするにはどうすればよいですか?

これは私のすてきなバージョンです (別のものを使用する必要がありますか?): 1.1.0.1

Ubuntu 12.10 で動作する jetty 8.1.9 にアプリをデプロイしています。

0 投票する
7 に答える
28800 参照

java - kafka 8 とメモリ - Java ランタイム環境を続行するにはメモリが不足しています

512 メガバイトの RAM で DigiOcean インスタンスを使用しています。kafka で以下のエラーが発生します。私は Java に精通した開発者ではありません。少量のラムを利用するようにカフカを調整するにはどうすればよいですか。これは開発サーバーです。より大きなマシンに 1 時間あたりの料金を払いたくありません。

0 投票する
1 に答える
3721 参照

hadoop - カフカ 0.8 で camus サンプルを実行する

私は camus を初めて使用し、これまでのところ kafka 0.8 で試して使用したいと思います。例のように 2 つのキューを作成したソースをダウンロードし、ジョブ構成ファイルを構成し (以下を参照)、自分のマシンで実行しようとしました (詳細は以下を参照)。 ) このコマンドで

jar には、shade ファイルなどのすべての依存関係が含まれています

そして、私はこのエラーが発生しています:

intellij-idea エディターで実行しようとしたときにエラーが発生しましたが、エラーの理由が見つかりました

私が間違っていることを説明してもらえますか?

camus 設定ファイル

機械詳細:

hortonworks - hdp 2.0.0.6 と kafka 0.8 ベータ 1

0 投票する
1 に答える
1294 参照

rabbitmq - データベースのレプリケーションに Message Broker を使用する (現在は RabbitMQ )

システムのデータが変更されると、すべての変更を少なくとも 4 つの異なるコンシューマー (1 秒あたり約 3000 メッセージ) に公開するので、メッセージ ブローカーを使用したいと考えています。
コンシューマのほとんどは、データベース テーブルを変更して更新する責任があります。

(DBは異なります-カウチ、mysqlなど、独自のレプリケーションメカニズムを使用したり、dbトリガーを使用したりするなどのソリューションは不可能です)質問

  1. メッセージブローカーを使用して DB 間でデータを複製した経験のある人はいますか?
    それは良い習慣ですか?

  2. 失敗した場合はどうすればよいですか?
    たとえば、RabbitMQ を使用して、クライアントがキューから 10,000 件のメッセージを削除し、ACK を送信し、それらを処理する前に毎回例外をスローしたとします。今、それらは失われています。キューに戻る方法はありますか?

    (それらを再キューイングすると、順序が台無しになります)。

  3. rabbitMQ を使用することは良い習慣ですか? Kafka のようにキューに戻る機能は、シナリオを失敗させるために重要ではありませんか?

ありがとう。

0 投票する
4 に答える
21469 参照

apache-kafka - Kafka の高レベル コンシューマは、Java API を使用してトピックからすべてのメッセージを取得します (--from-beginning と同等)

Kafka サイトの ConsumerGroupExample コードを使用して、Kafka High Level Consumer をテストしています。Kafka サーバー構成にある「test」というトピックに関する既存のメッセージをすべて取得したいと思います。他のブログを見ると、auto.offset.reset を「最小」に設定して、すべてのメッセージを取得できるようにする必要があります。

私が実際に持っている質問は次のとおりです。高レベルのコンシューマーに対する同等の Java API 呼び出しは何ですか。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning