Flume-ng から Google Cloud Storage にデータを書き込みたいと考えています。非常に奇妙な動作を観察したため、少し複雑です。説明させてください:
序章
バケットを使用するように設定された Google クラウド (ワンクリック) で Hadoop クラスターを起動しました。
マスターで ssh し、hdfs
コマンドを使用してファイルを追加すると、バケットですぐに確認できます
$ hadoop fs -ls /
14/11/27 15:01:41 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.2.9-hadoop2
Found 1 items
-rwx------ 3 hadoop hadoop 40 2014-11-27 13:45 /test.txt
しかし、コンピューターから追加して読み取ろうとすると、他の HDFS を使用しているようです。ここに というファイルを追加しましたjp.txt
が、以前のファイルが表示されませんtest.txt
$ hadoop fs -ls hdfs://ip.to.my.cluster/
Found 1 items
-rw-r--r-- 3 jp supergroup 0 2014-11-27 14:57 hdfs://ip.to.my.cluster/jp.txt
これは、HDFS を調べたときに表示される唯一のファイルでもありますhttp://ip.to.my.cluster:50070/explorer.html#/
Web コンソール ( https://console.developers.google.com/project/my-project-id/storage/my-bucket/ )を使用してバケット内のファイルを一覧表示すると、のみが表示され、 は表示test.txt
されませんjp.txt
。
Hadoop が Google Cloud Storage に接続できないことを読み、それに応じて Hadoop クライアントを構成した (かなり難しいことです) と、バケット内のアイテムが表示されるようになりました。しかし、そのためにはgs://
URIを使用する必要があります
$ hadoop fs -ls gs://my-bucket/
14/11/27 15:57:46 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.3.0-hadoop2
Found 1 items
-rwx------ 3 jp jp 40 2014-11-27 14:45 gs://my-bucket/test.txt
観察・中間結論
したがって、同じクラスタに 2 つの異なるストレージ エンジンがあるようです: 「従来の HDFS」( で始まるhdfs://
) と Google ストレージ バケット ( で始まるgs://
)。
ユーザーと権限は、ファイルをどこからリストするかによって異なります。
質問
主な質問は次のとおりです。flume を使用して Google Cloud Storage 上の HDFS/GS に書き込むために必要な最小限の設定は何ですか?
関連する質問
- 目標を達成するために、Google Cloud で Hadoop クラスタを起動する必要がありますか?
- Google Cloud Storage Bucket に直接書き込むことはできますか? はいの場合、どのようにflumeを構成できますか? (jar の追加、クラスパスの再定義...)
- 同じクラスターに 2 つのストレージ エンジンがあるのはなぜですか (従来の HDFS / GS バケット)
私のフルーム構成
a1.sources = http
a1.sinks = hdfs_sink
a1.channels = mem
# Describe/configure the source
a1.sources.http.type = org.apache.flume.source.http.HTTPSource
a1.sources.http.port = 9000
# Describe the sink
a1.sinks.hdfs_sink.type = hdfs
a1.sinks.hdfs_sink.hdfs.path = hdfs://ip.to.my.cluster:8020/%{env}/%{tenant}/%{type}/%y-%m-%d
a1.sinks.hdfs_sink.hdfs.filePrefix = %H-%M-%S_
a1.sinks.hdfs_sink.hdfs.fileSuffix = .json
a1.sinks.hdfs_sink.hdfs.round = true
a1.sinks.hdfs_sink.hdfs.roundValue = 10
a1.sinks.hdfs_sink.hdfs.roundUnit = minute
# Use a channel which buffers events in memory
a1.channels.mem.type = memory
a1.channels.mem.capacity = 1000
a1.channels.mem.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.http.channels = mem
a1.sinks.hdfs_sink.channel = mem
行 a1.sinks.hdfs_sink.hdfs.path はgs://
パスを受け入れますか?
その場合、どのようなセットアップが必要ですか (追加の jar、クラスパス) ?
ありがとう