3

Confluent プラットフォームが提供する kafka-hdfs-connector を使用して、Kafka から Hive テーブルにデータをコピーしようとしています。私はそれをうまく行うことができましたが、時間間隔に基づいて着信データをバケット化する方法を考えていました。たとえば、5 分ごとに新しいパーティションを作成したいと考えています。

partition.duration.msでio.confluent.connect.hdfs.partitioner.TimeBasedPartitionerを試しましたが、間違った方法でやっていると思います。Hive テーブルには、すべてのデータがその特定のパーティションに入る 1 つのパーティションしか表示されません。このようなもの :

hive> show partitions test;
OK
partition
year=2016/month=03/day=15/hour=19/minute=03

そして、すべての avro オブジェクトがこのパーティションにコピーされます。

代わりに、次のようなものが欲しいです:

hive> show partitions test;
OK
partition
year=2016/month=03/day=15/hour=19/minute=03
year=2016/month=03/day=15/hour=19/minute=08
year=2016/month=03/day=15/hour=19/minute=13

最初に、コネクタはパスyear=2016/month=03/day=15/hour=19/minute=03を作成し、次の 5 分間、すべての受信データをこのディレクトリにコピーし続け、6 分の開始時に新しいパス、つまりyear=2016/month=03/day=15/hour=19/minute=08を作成し、次の 5 分間のデータをこのディレクトリにコピーする必要があります。

これは私の設定ファイルがどのように見えるかです:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test
hdfs.url=hdfs://localhost:9000
flush.size=3
partitioner.class=io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner
partition.duration.ms=300000
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=MM/
locale=en
timezone=GMT
logs.dir=/kafka-connect/logs
topics.dir=/kafka-connect/topics
hive.integration=true
hive.metastore.uris=thrift://localhost:9083
schema.compatibility=BACKWARD

誰かが私を正しい方向に向けることができれば、それは本当に役に立ちます。必要に応じて、詳細を共有していただければ幸いです。この質問を終わりのないもののように見せたくない.

どうもありがとう!

4

1 に答える 1

7

path.format の分のフィールドが間違っています:

path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=MM/

そのはず:

path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=mm/
于 2016-07-21T15:03:55.407 に答える