3

を使用してストリーミング データを に書き込みConfluentます。ユーザーマニュアルに従い、コネクタのクイックスタートとセットアップを行いました。トピックを 1 つだけ使用すると、正常に動作します。私のプロパティファイルは次のようになりますHDFS ConnectorHDFS

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_topic1
hdfs.url=hdfs://localhost:9000
flush.size=30

複数のトピックを追加すると、オフセットが継続的にコミットされ、コミットされたメッセージが書き込まれません。

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=2
topics=test_topic1,test_topic2
hdfs.url=hdfs://localhost:9000
flush.size=30

1と2でtasks.maxを試しました。以下のように連続してCommitting offsetsログに記録されます

[2016-10-26 15:21:30,990] INFO Started recovery for topic partition test_topic1-0 (io.confluent.connect.hdfs.TopicPartitionWriter:193)
[2016-10-26 15:21:31,222] INFO Finished recovery for topic partition test_topic1-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2016-10-26 15:21:31,230] INFO Started recovery for topic partition test_topic2-0 (io.confluent.connect.hdfs.TopicPartitionWriter:193)
[2016-10-26 15:21:31,236] INFO Finished recovery for topic partition test_topic2-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2016-10-26 15:21:35,155] INFO Reflections took 6962 ms to scan 249 urls, producing 11712 keys and 77746 values  (org.reflections.Reflections:229)
[2016-10-26 15:22:29,226] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
[2016-10-26 15:23:29,227] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
[2016-10-26 15:24:29,225] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
[2016-10-26 15:25:29,224] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:261)

サービスを正常に停止すると (Ctrl+C)、tmpファイルが削除されます。私は何を間違っていますか?それを行う適切な方法は何ですか?これに関する提案をお待ちしております。

4

1 に答える 1

2

私はあなたがここで言及したのと同じ問題に過去 1 か月ほどつまずき続けていましたが、今日、confluent 3.1.1にアップグレードして期待どおりに動作するようになるまで、その根底にたどり着くことができませんでした。 ...

これが私が転がる方法です

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=5
topics=accounts,contacts,users
hdfs.url=hdfs://localhost:9000
flush.size=1
hive.metastore.uris=thrift://localhost:9083
hive.integration=true
schema.compatibility=BACKWARD
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.HourlyPartitioner
locale=en-us
timezone=UTC
于 2016-12-08T12:04:35.640 に答える