MySqlからKsqlへのデータ パイプラインを構築しようとしています。
ユース ケース: データ ソースは MySql です。MySqlでテーブルを作成しました。
私は使っている
./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
スタンドアロン コネクタを開始します。そして、それはうまく機能しています。
トピック名でコンシューマーを開始しています。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1Category --from-beginning
MySQLテーブルにデータを挿入すると、コンシューマーでも結果が得られます。同じトピック名で KSQL Stream を作成しました。Kstreamでも同じ結果が期待されていますが、実行しているときに結果が得られません
select * from <streamName>
コネクタ構成 -- source-quickstart-mysql.properties
name=jdbc_source_mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
connection.url=jdbc:mysql://localhost:3306/testDB?user=root&password=cloudera
#comment=Which table(s) to include
table.whitelist=ftest
mode=incrementing
incrementing.column.name=id
topic.prefix=ftopic
サンプルデータ
- MySql
1.) データベースの作成:
CREATE DATABASE testDB;
2.) データベースを使用する:
USE testDB;
3.) テーブルを作成します。
CREATE TABLE products (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
4.) テーブルにデータを挿入します。
INSERT INTO products(id,name,description,weight)
VALUES (103,'car','Small car',20);
- KSQL
1.) ストリームを作成します。
CREATE STREAM pro_original (id int, name varchar, description varchar,weight bigint) WITH \
(kafka_topic='proproducts', value_format='DELIMITED');
2.) クエリを選択します。
Select * from pro_original;
期待される出力
- 消費者
MySQL テーブルに挿入されたデータを取得します。
ここでは、MySQL でデータを取得しています。
- Ksql
Mysql テーブルに挿入され、Kafka トピックに反映されるインストリーム データを入力する必要があります。
ksql で期待される結果が得られません
このデータ パイプラインを手伝ってください。