問題タブ [apache-kafka-connect]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
elasticsearch - Kafka-Connect vs Filebeat & Logstash
私は Kafka から消費し、Hadoop と Elasticsearch にデータを保存しようとしています。私は現在これを行う 2 つの方法を見てきました: Filebeat を使用して Kafka から消費し、それを ES に送信する方法と、Kafka-Connect フレームワークを使用する方法です。Kafka-Connect-HDFS および Kafka-Connect-Elasticsearch モジュールがあります。
ストリーミング データの送信にどちらを使用すればよいかわかりません。ある時点で Kafka からデータを取得して Cassandra に配置したい場合は、そのために Kafka-Connect モジュールを使用できますが、Filebeat にはそのような機能はありません。
apache-kafka-connect - kafka-connect フレームワークが 1 回限りのセマンティクスを実現するために必要な設定
Kafka-Connect では、.properties ファイルにいくつかのパラメーターを指定して、1 回限りのセマンティクスを有効にできるという印象を受けました。
これらの設定を見つけることができませんでした。
しかし、
https ://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md のようにこれを達成する他の方法を見つけました。 /カフカ/よくある質問
カフカ接続の設定を変更することで、一度だけのセマンティクスを達成することは可能ですか?
apache-kafka - avroスキーマをkafkaで一度だけ送信する方法
次のコードを使用しています (実際にはそうではありませんが、仮定しましょう) スキーマを作成し、プロデューサーによって kafka に送信します。
問題は、コードにより、このスキーマで 1 つのメッセージしか送信できないことです。次に、次のメッセージを送信するためにスキーマ名を変更する必要があります...そのため、名前文字列は現在ランダムに生成されているため、さらにメッセージを送信できます。これはハックなので、これを行う適切な方法を知りたいです。
また、スキーマなしでメッセージを送信する方法も調べました (つまり、スキーマを含む 1 つのメッセージを既に kafka に送信したので、他のすべてのメッセージはもうスキーマを必要としません) - ただしnew GenericData.Record(..)
、スキーマ パラメーターが必要です。null の場合、エラーがスローされます。
では、avro スキーマ メッセージを kafka に送信する正しい方法は何ですか?
これは別のコードサンプルです - 私のものとかなり同じです
: /confluent/examples/producer/ProducerExample.java
また、スキーマを設定せずに送信する方法も示していません。
apache-kafka - コネクタ内で ID/サブジェクトによって登録されたスキーマを取得します
ソース コネクタで登録済みのスキーマを取得することはできますか? スキーマ レジストリに登録されている静的スキーマがあり、そのスキーマをコネクタから取得して SourceRecord に渡す必要があります。
schema id を使用してスキーマを取得するために CachedSchemaRegistryClient を使用しましたが、Avro スキーマが返されました。したがって、これを org.apache.kafka.connect.data.Schema に変換する必要があります。これに関するいくつかの指針を提供してください。
または、 CachedSchemaRegistryClient を使用せずに直接同じことができるコネクタの他の方法はありますか?
ありがとう、スリージット
jms - Kafka コネクタ - Kafka トピックの JMSSourceConnector
Confluent は、デフォルトでこの JMSSourceConnector for Kafka トピックを提供します。
それとも、このためにカスタム コネクタを作成する必要がありますか?
これに関する Confluent ページにドキュメントはありません。
apache-kafka - Kafka トピック ログの保持を永続的にする
ログ メッセージを Kafka トピックに書き込んでいますが、このトピックを永続的に保持したいと考えています。Kafka および Kafka Connect (_schemas、connect-configs、connect-status、connect-offsets など) で、ログ保持時間までに削除されない特別なトピックがあることを確認しました。トピックをこれらの他の特別なトピックのように強制するにはどうすればよいですか? それは命名規則ですか、それとも他のプロパティですか?
ありがとう
java - Kafka Connect : 構造体からネストされたフィールドを取得する方法
Kafka-Connect を使用して Kafka-Elasticsearch コネクタを実装しています。
プロデューサーは複雑な JSON を Kafka トピックに送信し、コネクタ コードはこれを使用して Elastic Search に永続化します。コネクタは構造体 ( https://kafka.apache.org/0100/javadoc/org/apache/kafka/connect/data/Struct.html )の形式でデータを取得します。
トップレベルの Json で構造体のフィールド値を取得できますが、ネストされた json からフェッチすることはできません。
「op」は解析できますが、「test.test.employee.Value」は解析できません。
hadoop - マルチ DC セットアップでホスト名の代わりにプライベート IP に接続する kafka HDFS コネクタ
2 つのクラスターがあります。
- コンフルエントな家の中の1つ(3.0.0-1)
- AWS に 1 つ、hadoop (hdp 2.4) を使用
hdfs コネクタを使用して、コンフルエントから Hadoop に書き込もうとしています。
簡単に言えば、コネクタは、ホスト名を使用する代わりに、hadoop クラスターのプライベート IP に接続しようとします。社内クラスターでは、/etc/hosts が更新され、内部 Hadoop ホスト名が関連するパブリック IP に解決されます。
私は分散コネクタを使用しています。次のようなコネクタ JSON ファイルがたくさんあります。
ワーカーは次のように定義されます。
いくつかのメモ:
- /kafka-connect は hdfs に存在し、誰でも書き込み可能
- 3 つのトピック (*.storage.topic) は存在します
- Kafkaブローカーを使用して各(3)サーバーで1つのワーカーを実行しています(すべてのブローカーにスキーマレジストリ、残りのAPI、およびzookeeperサーバーがあります)
- dfs.client.use.datanode.hostname を true に設定しました。このプロパティはクライアントの $HADOOP_HOME/hdfs-site.xml に設定されています。
/kafka-connect のサブディレクトリとハイブ メタデータが作成されていることがわかります。コネクタを起動すると、次のメッセージが表示されます。
createBlockOutputStream での INFO 例外 (org.apache.hadoop.hdfs.DFSClient:1471) org.apache.hadoop.net.ConnectTimeoutException: チャネルの接続準備が整うまでの待機中に 60000 ミリ秒のタイムアウトが発生しました。ch : java.nio.channels.SocketChannel[org.apache.hadoop.net.NetUtils.connect(NetUtils.java:533) の接続保留リモート org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java: 1610) org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1408) org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1361) org.apache.hadoop .hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:588) INFO 破棄 BP-429601535-10.0.0.167-1471011443948:blk_1073742319_1495 (org.apache.hadoop.hdfs.DFSClient:
これを修正する方法について何か考えはありますか? コンフルエントは、ホスト名ではなく IP を直接受け取るようです。