を使用してストリーミング データを に書き込みConfluent
ます。ユーザーマニュアルに従い、コネクタのクイックスタートとセットアップを行いました。トピックを 1 つだけ使用すると、正常に動作します。私のプロパティファイルは次のようになりますHDFS Connector
HDFS
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_topic1
hdfs.url=hdfs://localhost:9000
flush.size=30
複数のトピックを追加すると、オフセットが継続的にコミットされ、コミットされたメッセージが書き込まれません。
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=2
topics=test_topic1,test_topic2
hdfs.url=hdfs://localhost:9000
flush.size=30
1と2でtasks.maxを試しました。以下のように連続してCommitting offsets
ログに記録されます
[2016-10-26 15:21:30,990] INFO Started recovery for topic partition test_topic1-0 (io.confluent.connect.hdfs.TopicPartitionWriter:193)
[2016-10-26 15:21:31,222] INFO Finished recovery for topic partition test_topic1-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2016-10-26 15:21:31,230] INFO Started recovery for topic partition test_topic2-0 (io.confluent.connect.hdfs.TopicPartitionWriter:193)
[2016-10-26 15:21:31,236] INFO Finished recovery for topic partition test_topic2-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2016-10-26 15:21:35,155] INFO Reflections took 6962 ms to scan 249 urls, producing 11712 keys and 77746 values (org.reflections.Reflections:229)
[2016-10-26 15:22:29,226] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
[2016-10-26 15:23:29,227] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
[2016-10-26 15:24:29,225] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
[2016-10-26 15:25:29,224] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
サービスを正常に停止すると (Ctrl+C)、tmp
ファイルが削除されます。私は何を間違っていますか?それを行う適切な方法は何ですか?これに関する提案をお待ちしております。