問題タブ [apache-kafka-connect]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
0 に答える
2015 参照

json - Kafka Connect with MongoDB Connector

MongoDB シンク コネクタ (コネクタ)で Apache Kafka Connect を使用しようとしました

Avro 形式を使用すると、1 つの問題を除いて機能しましたが、コネクタが新しい Mongo レコードの _id に record.kafkaOffset() を使用するため、1 つのパーティションでトピックを作成する必要がありました (したがって、複数のパーティションでは、異なるレコードに対して同じ ID を取得します)。記録)。

どうすれば修正できますか?

Jsonでテストしたいので、そのための新しいトピックを作成しました。そして、コンバーターの構成を JsonConverter に変更しました。実行すると、次のエラーが表示されます。

java.lang.ClassCastException: org.apache.kafka.connect.mongodb.MongodbSinkTask.put(MongodbSinkTask.java:106) で java.util.HashMap を org.apache.kafka.connect.data.Struct にキャストできません。 org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176) の org.apache.kafka.connect.runtime. WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90) org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58) org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java: 82)

このコネクタは Json で動作しますか?

0 投票する
0 に答える
333 参照

java - Kafka と Java を使用するワーカーを含むデータ パイプラインをプログラムまたは動的に作成するにはどうすればよいですか?

非常に高いレベルでは、マルチテナント プラットフォーム インターフェイスを提供して、ユーザーが「プロジェクト」を作成できるようにします。これは、RSS フィード、SQL Server クエリ、およびファイルの場所など、さまざまなプロデューサー タイプの Kafka トピックに変換されます。

たとえば、RSS データを取得するには、Kafka を作成してスケジュールに従って RSS データをフェッチする必要があると思います。そのため、構成ファイルではなく、新しいマルチスレッド ワーカー (またはワーカーへのジョブ) を動的に追加する必要があります。

アプリケーション抽象化レイヤーは、ユーザーを Kafka および ElasticSearch から分離します。コントロール センターからすべてのユーザーにアクセスを許可することはできません。

ある種のマネージド サービスを提供する別のユース ケースでは、CC を使用できます。

この使用法を考慮して、これを達成するための適切な方法について詳細を取得したいと思います-まだいくつかのギャップがあるようです-つまり、トピックを動的に作成する必要があるため、構成ファイルを使用できません。

Kafka と Java を使用してデータ パイプラインとワーカーをプログラムまたは動的に作成するにはどうすればよいですか?

0 投票する
1 に答える
1868 参照

streaming - Zeppelin 6.5 + 構造化ストリーミング 2.0.2 用の Apache Kafka コネクタ

Spark の構造化ストリーミングの例と Kafka コネクタを含む zeppelin ノートブックを実行しようとしています。

ここに私の環境があります:

私のツェッペリンノートブックのコードは次のとおりです。

ノートブックを実行すると、次のエラーが表示されます。

import org.apache.spark.sql.functions.{explode, split} java.lang.ClassNotFoundException: データ ソースが見つかりませんでした: kafka。https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projectsでパッケージを見つけて ください。 org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148) で org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79) で org .apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79) org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218) org.apache.spark で.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80) at org.apache.spark.sql .execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124) ... 86 省略原因: java.lang.ClassNotFoundException :カフカ。DefaultSource の scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62) の java.lang.ClassLoader.loadClass(ClassLoader.java:424) の java.lang.ClassLoader.loadClass(ClassLoader.java:357) org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) で org.apache.spark.sql.execution.datasources.DataSource$$anonfun で$5$$anonfun$apply$1.apply(DataSource.scala:132) で scala.util.Try$.apply(Try.scala:192)132) org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) で scala.util.Try$.apply(Try.scala:192) )132) org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) で scala.util.Try$.apply(Try.scala:192) )

ヘルプアドバイスをいただければ幸いです。

thnx

0 投票する
0 に答える
231 参照

hdfs - sparkペアrddをファイルとしてHDFSに保存する方法は?

こんにちは、3 つのパーティションと 2 つのレプリカを持つ kafka トピックを作成しました。kafka から Spark ストリーミングにメッセージ/レコードを公開して (何らかのプロセスで)、データを HDFS に保存しようとしています。ペア RDD をテキスト ファイルとして保存しようとしましたが、うまくいきません。

このコードは機能しません。

コンソール出力:

実際、私のpom.xml

0 投票する
0 に答える
1328 参照

elasticsearch - logstash kafka 入力パフォーマンス / 構成チューニング

logstash を使用して Kafka から Elasticsearch にデータを転送すると、次のエラーが発生します。

セッションのタイムアウト (30000) と最大ポーリング レコード (250) を調整しようとしました。

このトピックは、1 秒あたり 1000 個のイベントを avro 形式で生成します。10 のパーティション (2 つのサーバー) と、それぞれ 5 つのコンシューマー スレッドを持つ 2 つの logstash インスタンスがあります。

1 秒あたり 100 ~ 300 件のイベントがある他のトピックについては問題ありません。

同じトピックで Kafka と Elasticsearch の間に 2 つ目のコネクタもあり、正常に動作するため、構成の問題であると思います (confluent の kafka-connect-elasticsearch)

主な目的は、kafka connect と logstash をコネクタとして比較することです。多分誰かが一般的な経験も持っていますか?