編集*: 完全な構成ファイルは次のとおりです。
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.command = /usr/bin/vmstat 1
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.consumer.timeout.ms = 20000000
tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.channels.channel1.brokerList= ip.address:9092
tier1.channels.channel1.topic= test1
tier1.channels.channel1.zookeeperConnect=ip.address:2181
tier1.channels.channel1.parseAsFlumeEvent=false
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /user/flume/
tier1.sinks.sink1.hdfs.rollInterval = 5000
tier1.sinks.sink1.hdfs.rollSize = 5000
tier1.sinks.sink1.hdfs.rollCount = 1000
tier1.sinks.sink1.hdfs.idleTimeout= 10
tier1.sinks.sink1.hdfs.maxOpenFiles=1
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1
最近までidleTimeoutとmaxOpenFilesはありませんでした。そのため、これら 2 つのオプションのデフォルト構成でも機能しませんでした。
Flume を使用して Kafka データを集計することに関する質問。現在、Flume はストリーミング データを読み込むために毎秒新しいファイルを作成しています。これらは私の設定です:
tier1.sinks.sink1.hdfs.rollInterval = 500 (should be 500 seconds)
tier1.sinks.sink1.hdfs.rollSize = 5000 (should be bytes)
tier1.sinks.sink1.hdfs.rollCount = 1000 (number of events)
私が完全に確信していない1つの設定はrollCountなので、いくつかの追加情報:
私は 80 バイト/秒を取得しています。いくつかのファイルは 80 バイトで 2 つのメッセージがあり、いくつかは 160 バイトですが、4 つのメッセージがあります。時間やサイズに基づいて実行していないので、カウントする必要があるかもしれませんが、そのような小さなメッセージが1000イベントとして登録される理由がわかりませんか?
お手伝いありがとう!