1

MySqlからKsqlへのデータ パイプラインを構築しようとしています。

ユース ケース: データ ソースは MySql です。MySqlでテーブルを作成しました。

私は使っている

./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties  ./etc/kafka-connect-jdbc/source-quickstart-sqlite.properties 

スタンドアロン コネクタを開始します。そして、それはうまく機能しています。

トピック名でコンシューマーを開始しています。

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1Category --from-beginning

MySQLテーブルにデータを挿入すると、コンシューマーでも結果が得られます。同じトピック名で KSQL Stream を作成しました。Kstreamでも同じ結果が期待されていますが、実行しているときに結果が得られません

select * from <streamName>

コネクタ構成 -- source-quickstart-mysql.properties

    name=jdbc_source_mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

connection.url=jdbc:mysql://localhost:3306/testDB?user=root&password=cloudera

#comment=Which table(s) to include
table.whitelist=ftest

mode=incrementing
incrementing.column.name=id

topic.prefix=ftopic

サンプルデータ

  • MySql

1.) データベースの作成:

CREATE DATABASE testDB;

2.) データベースを使用する:

USE testDB;

3.) テーブルを作成します。

    CREATE TABLE products (
  id INTEGER NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512),
  weight FLOAT
);

4.) テーブルにデータを挿入します。

    INSERT INTO products(id,name,description,weight)
  VALUES (103,'car','Small car',20);
  • KSQL

1.) ストリームを作成します。

CREATE STREAM pro_original (id int, name varchar, description varchar,weight bigint) WITH \
(kafka_topic='proproducts', value_format='DELIMITED');

2.) クエリを選択します。

Select * from pro_original;

期待される出力

  1. 消費者

MySQL テーブルに挿入されたデータを取得します。

ここでは、MySQL でデータを取得しています。

  1. Ksql

Mysql テーブルに挿入され、Kafka トピックに反映されるインストリーム データを入力する必要があります。

ksql で期待される結果が得られません

このデータ パイプラインを手伝ってください。

4

1 に答える 1

2

データは AVRO 形式ですが、定義したのではVALUE_FORMATなく. トピックに格納される値の形式を KSQL に指示することが重要です。以下はあなたのためにトリックを行うはずです。AVRODELIMITED

CREATE STREAM pro_original_v2 \ 
WITH (KAFKA_TOPIC='products', VALUE_FORMAT='AVRO');

実行にkafkaトピックに挿入されたデータ

SELECT * FROM pro_original_v2;

ksql コンソール ウィンドウに表示されるはずです。

ここで、KSQL での Avro の例をいくつか見ることができます。

于 2018-10-23T09:32:54.097 に答える