問題タブ [apache-kafka-connect]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - Kafka-Connect API で max.poll.records を設定する方法
confluent-3.0.1 プラットフォームを使用し、Kafka-Elasticsearch コネクタを構築しています。このために、SinkConnector と SinkTask (Kafka 接続 API) を拡張して、Kafka からデータを取得します。
このコードの一部として、SinkConnector の taskConfigs メソッドを拡張して「max.poll.records」を返し、一度に 100 レコードのみを取得しています。しかし、それは機能せず、すべてのレコードを同時に取得していますが、規定の時間内にオフセットをコミットできません。「max.poll.records」を設定するのを手伝ってください。
apache-kafka - ソースだけでなくシンクでもある Kafka Connect の設計方法
トピックにサブスクライブし、メッセージを変換し、変換されたメッセージを別のトピックにプッシュする Kafka-Connector を開発しています。
これまでのところ、私はそれをSinkTask
クラスとして実装しており、各タスクは ETL を実行し、パブリッシャー オブジェクトをインスタンス化し、別のトピックにメッセージを書き戻します。
それを実装するよりクリーンな方法はありますか?これは、コネクタがソースだけでなくシンクでもある一般的な使用例だと思います。
apache-kafka - Kafka Connect で使用できる REST API は何ですか?
Kafka Connect の REST API に関する 2 つのリンクを見つけました。https://kafka.apache.org/documentation#connectに追加のものがあります。GET /connectors/{name}/status
またはを使用すると 404 エラーが発生しますPOST /connectors/{name}/restart
。ただし、これらのサービスはhttp://docs.confluent.io/2.0.0/connect/userguide.html#rest-interfaceには記載されていません。
コネクタが正しく実行されているかどうかを確認するにはどうすればよいですか?
前もって感謝します!
kafka-consumer-api - KafkaStream がトピックからメッセージを受信しない
私は遊んでいてKafkaStreams
、KafkaConnect
単にトピックからのメッセージを消費しようとしています。このトピック用に「標準」のバッチ コンシューマーをセットアップしましたが、これは魅力的に機能します。最初にいくつかのレコードを Kafka に送信し、後でそれらを使用します。今、私はKakfaストリームを使用して同じことをしたいのですが、トピックから単一のメッセージを受け取りません. これが私が使用している消費者コードです。
私の問題は、私のコードがit.hasNext()
タイムアウトに達するまで待機し続けることです。ここで詳細が欠けている可能性がありますが、なぜこのトピックから何も得られないのかわかりません。このテストの一環として、コンシューマが開始する直前にこのトピックに多数のレコードを送信するプロデューサーがあるため、オフセットの問題になることはありません。どんなアイデアでも大歓迎です。
elasticsearch - Kafka Connect の Elasticsearch コネクタ - オフセットとタイムスタンプ
カフカ接続にelasticsearchコネクタ(コンフルエント)を使用しています。トピックからelasticsearchインデックスにメッセージを取得しています。キーが使用されていない場合、オフセットは _id の一部であることがわかります。オフセットとタイムスタンプに基づいて、elasticseach のメッセージを閲覧したいと考えています。
メッセージのオフセットとタイムスタンプをインデックスのフィールドとして取得することは可能ですか?
これがelasticsearchドキュメントです
{
"_index": "test-elasticsearch-sink",
"_type": "kafka-connect",
"_id": "test-elasticsearch-sink+0+0",
"_score": 1,
"_source": {
"f1": "value1"
}
}
ありがとう、ラジェッシュ
hadoop - kafka-connect を使用した複数のハイブ パーティション
プロセス中に、ハイブ統合をオンにして、kafka-connect を使用して HDFS にデータをストリーミングしようとしました。
私のユースケースでは、「FieldPartioner」をパーティショナー クラスとして使用する必要があります。
私の問題は、複数のパーティションを取得できないことです。
例:
私の例のJSON
「mydate」と「hour」に基づいてパーティションを作成したい
私は次のことを試しました
また、partition.field.nameを次のように指定してみました
と
そして、より多くのそのような組み合わせ
この問題に関するヘルプは大歓迎です
ありがとう。