私は Apache Flume を使用するのが初めてで、それがどのように機能するかを正確に理解するのは困難です。私の問題を説明するために、私は自分の必要性と私がやったことを説明します。
csv ファイルのディレクトリ (これらのファイルは 5 分ごとに作成されます) と HDFS クラスターの間のストリームを構成したいと考えています。
「スプーリング ディレクトリ」ソースと HDFS シンクが必要であることがわかりました。それは私にこのflume.confファイルを与えます
agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = hdfsSink
# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = spooldir
agent.sources.seqGenSrc.spoolDir = /home/user/data
# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://localhost/Flume/data
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.writeFormat=Text
#Specify the channel the sink should use
agent.sinks.hdfsSink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100
その結果、ローカル ファイル システムで入力ファイルの名前が「.complete」に変更され、HDFS にデータがアップロードされ、Flume によって生成された一意の新しい名前が付けられます。
それはほとんど私が必要としていたものです。
しかし、アップロードする前に、ファイル固有の操作 (ヘッダーの削除、カンマのエスケープなど) を行いたいと考えています。方法がわかりません。インターセプターを使用することを考えています。しかし、データがフルームにある場合、イベントで変換され、ストリーミングされます。彼の時点では、ファイルの知識はありません。
そうしないと、元の時間イベントがファイル名に書き込まれるため、現在の日付ではなく、この時間をイベントに関連付けたいと思います。
また、元のファイル名を hdfs に保持したい (そこにはいくつかの有用な情報があります)。
誰でも私を助けるためのアドバイスがありますか?