問題タブ [apache-kafka-connect]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
2 に答える
6905 参照

apache-kafka - Kafka-Connect API で max.poll.records を設定する方法

confluent-3.0.1 プラットフォームを使用し、Kafka-Elasticsearch コネクタを構築しています。このために、SinkConnector と SinkTask (Kafka 接続 API) を拡張して、Kafka からデータを取得します。

このコードの一部として、SinkConnector の taskConfigs メソッドを拡張して「max.poll.records」を返し、一度に 100 レコードのみを取得しています。しかし、それは機能せず、すべてのレコードを同時に取得していますが、規定の時間内にオフセットをコミットできません。「max.poll.records」を設定するのを手伝ってください。

0 投票する
1 に答える
81 参照

apache-kafka - ソースだけでなくシンクでもある Kafka Connect の設計方法

トピックにサブスクライブし、メッセージを変換し、変換されたメッセージを別のトピックにプッシュする Kafka-Connector を開発しています。

これまでのところ、私はそれをSinkTaskクラスとして実装しており、各タスクは ETL を実行し、パブリッシャー オブジェクトをインスタンス化し、別のトピックにメッセージを書き戻します。

それを実装するよりクリーンな方法はありますか?これは、コネクタがソースだけでなくシンクでもある一般的な使用例だと思います。

0 投票する
1 に答える
4893 参照

apache-kafka - Kafka Connect で使用できる REST API は何ですか?

Kafka Connect の REST API に関する 2 つのリンクを見つけました。https://kafka.apache.org/documentation#connectに追加のものがあります。GET /connectors/{name}/statusまたはを使用すると 404 エラーが発生しますPOST /connectors/{name}/restart。ただし、これらのサービスはhttp://docs.confluent.io/2.0.0/connect/userguide.html#rest-interfaceには記載されていません。

コネクタが正しく実行されているかどうかを確認するにはどうすればよいですか?

前もって感謝します!

0 投票する
1 に答える
221 参照

kafka-consumer-api - KafkaStream がトピックからメッセージを受信しない

私は遊んでいてKafkaStreamsKafkaConnect単にトピックからのメッセージを消費しようとしています。このトピック用に「標準」のバッチ コンシューマーをセットアップしましたが、これは魅力的に機能します。最初にいくつかのレコードを Kafka に送信し、後でそれらを使用します。今、私はKakfaストリームを使用して同じことをしたいのですが、トピックから単一のメッセージを受け取りません. これが私が使用している消費者コードです。

私の問題は、私のコードがit.hasNext()タイムアウトに達するまで待機し続けることです。ここで詳細が欠けている可能性がありますが、なぜこのトピックから何も得られないのかわかりません。このテストの一環として、コンシューマが開始する直前にこのトピックに多数のレコードを送信するプロデューサーがあるため、オフセットの問題になることはありません。どんなアイデアでも大歓迎です。

0 投票する
2 に答える
1021 参照

elasticsearch - Kafka Connect の Elasticsearch コネクタ - オフセットとタイムスタンプ

カフカ接続にelasticsearchコネクタ(コンフルエント)を使用しています。トピックからelasticsearchインデックスにメッセージを取得しています。キーが使用されていない場合、オフセットは _id の一部であることがわかります。オフセットとタイムスタンプに基づいて、elasticseach のメッセージを閲覧したいと考えています。

メッセージのオフセットとタイムスタンプをインデックスのフィールドとして取得することは可能ですか?

これがelasticsearchドキュメントです

{ "_index": "test-elasticsearch-sink", "_type": "kafka-connect", "_id": "test-elasticsearch-sink+0+0", "_score": 1, "_source": { "f1": "value1" } }

ありがとう、ラジェッシュ

0 投票する
1 に答える
695 参照

hadoop - kafka-connect を使用した複数のハイブ パーティション

プロセス中に、ハイブ統合をオンにして、kafka-connect を使用して HDFS にデータをストリーミングしようとしました。

私のユースケースでは、「FieldPartioner」をパーティショナー クラスとして使用する必要があります。

私の問題は、複数のパーティションを取得できないことです。

例:

私の例のJSON

「mydate」と「hour」に基づいてパーティションを作成したい

私は次のことを試しました

また、partition.field.nameを次のように指定してみました

そして、より多くのそのような組み合わせ

この問題に関するヘルプは大歓迎です

ありがとう。