問題タブ [apache-kafka-connect]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
1507 参照

apache-kafka - Kafka Connect: コネクタとコンシューマーを実行した後、test.sink.txt が更新されない

以下の手順を使用して、Kafka connect-test トピックにデータを入力しようとしました。トピックを 1 回使用することができましたが、新しいファイルで 2 回目の試みを行ったところ、データを使用できませんでした。出力ファイルtest.sink.txtが更新されない

  1. echo -e "foo\nbar" > test.txt

  2. bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

  3. bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning

0 投票する
1 に答える
876 参照

apache-kafka - Kafka Connect SinkTask を拡張し、指定されたオフセットから消費を開始する

SinkTask独自のシンク コネクタを作成するために拡張したいと考えています。

フラッシュ中にオフセットを保存し、次にシンク コネクタを起動したときに、保存したオフセットから読み取りを再開したい場合、正しい方法は何ですか?

SinkTaskContextオーバーライドされたのを使用してinitialize(SinkTaskContext context)、独自のオフセットを割り当ててみました。

ただし、パーティションがまだ割り当てられていないため、これは機能しません。例外が発生していました。

次に、コンテキスト ( from initialize()) をグローバル変数に保存し、それを使用してメソッド内でオフセットを割り当てopen(Collection<TopicPartition> partitions)(オーバーライドされた from SinkTask)、内部で行っていたのと同じ方法で使用する必要がありinitializeますか? 例えば:

0 投票する
2 に答える
4850 参照

apache-kafka - Azure Blob Storage 用の Kafka コネクタ

Kafka にプッシュされたメッセージを深いストレージに保存する必要があります。私たちは Azure クラウド サービスを使用しているので、Azure Blob Storage の方が優れた選択肢になると思います。Kafka Connect のシンク コネクタ API を使用して、データを Azure Blob にプッシュしたいと考えています。Kafka のドキュメントでは、ほとんどの場合、HDFS でデータをエクスポートすることを提案していますが、その場合、Hadoop を実行する Linux VM が必要であり、コストがかかると思います。私の質問は、Azure Blob Storage が JSON オブジェクトを格納するための適切な選択であり、カスタム シンク コネクタを構築することがこの場合の合理的な解決策でしょうか?

0 投票する
2 に答える
1072 参照

java - Kafka 接続 API クライアント

kafka から hdfs にデータを書き込もうとしています。Confluent の kafka-connect-hdfs Java API の使用方法はどこにも文書化されていません。

0 投票する
1 に答える
925 参照

apache-kafka - Kafka Connect シンク タスクで put() がトリガーされる頻度はどれくらいですか?

put()Kafka Connect Sink タスクのメソッドがトリガーされる間隔を制御できますか? この点で、Kafka Connect フレームワークの予想される動作は何ですか? 理想的には、たとえば、「X 件の新しいレコード/Y 件の新しいバイトがない限り、または最後の呼び出しから Z ミリ秒経過していない限り、私に電話しないでください」と指定したいと思います。これにより、シンク タスク内のバッチ処理ロジックがより単純になる可能性があります (ドキュメントを引用すると、「多くの場合、内部バッファリングが役立つため、レコードのバッチ全体を一度に送信できるため、ダウンストリーム データ ストアにイベントを挿入するオーバーヘッドが削減されます) 。 .

0 投票する
1 に答える
1667 参照

apache-kafka - Kafka 接続のチュートリアルが機能しなくなった

このリンクのステップ 7 (Kafka Connect を使用してデータをインポート/エクスポートする) に従っていました。

http://kafka.apache.org/documentation.html#quickstart

「test.txt」ファイルを削除するまではうまくいきました。主な理由は、log4j ファイルがそのように機能するためです。一定の時間が経過すると、ファイルはローテーションされます。つまり、名前が変更され、同じ名前の新しいファイルが書き込まれ始めます。

しかし、「test.txt」を削除した後、コネクタが機能しなくなりました。コネクタ、ブローカー、zookeeper などを再起動しましたが、'test.txt' の新しい行が 'connect-test' トピックに移動せず、したがって 'test.sink.txt' ファイルに移動しません。

どうすればこれを修正できますか?

0 投票する
1 に答える
811 参照

apache-kafka - Kafka Connect HDFS シンクの問題

HDFS Sink Connector で Kafka-Connect を使用してデータをストリーミングしようとしています。スタンドアロン モードと分散モードの両方が正常に動作していますが、HDFS への書き込みは 1 回のみ (フラッシュ サイズに基づく) であり、後でストリーミングすることはありません。何か足りない場合は助けてください。

コンフルエント 2.0.0 & カフカ 0.9.0