0

編集*: 完全な構成ファイルは次のとおりです。

tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.command = /usr/bin/vmstat 1
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.consumer.timeout.ms = 20000000
tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.channels.channel1.brokerList= ip.address:9092
tier1.channels.channel1.topic= test1
tier1.channels.channel1.zookeeperConnect=ip.address:2181
tier1.channels.channel1.parseAsFlumeEvent=false

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /user/flume/
tier1.sinks.sink1.hdfs.rollInterval = 5000
tier1.sinks.sink1.hdfs.rollSize = 5000
tier1.sinks.sink1.hdfs.rollCount = 1000
tier1.sinks.sink1.hdfs.idleTimeout= 10
tier1.sinks.sink1.hdfs.maxOpenFiles=1
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1

最近までidleTimeoutとmaxOpenFilesはありませんでした。そのため、これら 2 つのオプションのデフォルト構成でも機能しませんでした。

Flume を使用して Kafka データを集計することに関する質問。現在、Flume はストリーミング データを読み込むために毎秒新しいファイルを作成しています。これらは私の設定です:

tier1.sinks.sink1.hdfs.rollInterval = 500 (should be 500 seconds)
tier1.sinks.sink1.hdfs.rollSize = 5000 (should be bytes)
tier1.sinks.sink1.hdfs.rollCount = 1000 (number of events)

私が完全に確信していない1つの設定はrollCountなので、いくつかの追加情報:

私は 80 バイト/秒を取得しています。いくつかのファイルは 80 バイトで 2 つのメッセージがあり、いくつかは 160 バイトですが、4 つのメッセージがあります。時間やサイズに基づいて実行していないので、カウントする必要があるかもしれませんが、そのような小さなメッセージが1000イベントとして登録される理由がわかりませんか?

お手伝いありがとう!

4

1 に答える 1

0

rollInterval はミリ秒でしょうか? 私は以前にこの問題を抱えていた可能性があると思います。

于 2015-02-27T05:23:00.083 に答える