ソースとシンクの間の速度のギャップのため、flume をテストして hHase にデータをロードし、flume のセレクターとインターセプターを使用して並列データロードを検討しています。
だから、私がflumeでやりたいことは
インターセプターの regex_extractor タイプでイベントのヘッダーを作成する
セレクターの多重化タイプを使用して、ヘッダー付きのイベントを 3 つ以上のチャネルに多重化する
1 つのソース チャネル シンクで。
以下のように設定してみました。
agent.sources = tailsrc
エージェント.チャネル = mem1 mem2
agent.sinks = std1 std2
agent.sources.tailsrc.type = exec
agent.sources.tailsrc.command = tail -F /home/flumeuser/test/in.txt
agent.sources.tailsrc.batchSize = 1
agent.sources.tailsrc.interceptors = i1
agent.sources.tailsrc.interceptors.i1.type = regex_extractor
agent.sources.tailsrc.interceptors.i1.regex = ^(\\d)
agent.sources.tailsrc.interceptors.i1.serializers = t1
agent.sources.tailsrc.interceptors.i1.serializers.t1.name = タイプ
agent.sources.tailsrc.selector.type = 多重化
agent.sources.tailsrc.selector.header = タイプ
agent.sources.tailsrc.selector.mapping.1 = mem1
agent.sources.tailsrc.selector.mapping.2 = mem2
agent.sinks.std1.type = file_roll
agent.sinks.std1.channel = mem1
agent.sinks.std1.batchSize = 1
agent.sinks.std1.sink.directory = /var/log/flumeout/1
agent.sinks.std1.rollInterval = 0
agent.sinks.std2.type = file_roll
agent.sinks.std2.channel = mem2
agent.sinks.std2.batchSize = 1
agent.sinks.std2.sink.directory = /var/log/flumeout/2
agent.sinks.std2.rollInterval = 0
agent.channels.mem1.type = メモリ
agent.channels.mem1.capacity = 100
agent.channels.mem2.type = メモリ
agent.channels.mem2.capacity = 100
しかし、それはうまくいきません!
セレクター部分を削除すると、flume のログにインターセプターのデバッグ メッセージが表示されます。しかし、セレクターとインターセプターが一緒の場合は何もありません。
間違った表現や私が見逃したものはありますか?
読んでくれてありがとう。:)