SparkRunner を使用して実行し、ローカル ファイルから読み取り、HDFS に書き込む Beam パイプラインを作成しようとしています。
最小限の例を次に示します。
オプションクラス -
package com.mycompany.beam.hdfsIOIssue;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.sdk.io.hdfs.HadoopFileSystemOptions;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;
public interface WritingToHDFSOptions extends PipelineOptions, SparkPipelineOptions, HadoopFileSystemOptions {
@Validation.Required
@Description("Path of the local file to read from")
String getInputFile();
void setInputFile(String value);
@Validation.Required
@Description("Path of the HDFS to write to")
String getOutputFile();
void setOutputFile(String value);
}
ビームメインクラス -
package com.mycompany.beam.hdfsIOIssue;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
public class WritingToHDFS {
public static void main(String[] args) {
PipelineOptionsFactory.register(WritingToHDFSOptions.class);
WritingToHDFSOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WritingToHDFSOptions.class);
Pipeline p = Pipeline.create(options);
buildPipeline(p, options);
p.run();
}
static void buildPipeline(Pipeline p, WritingToHDFSOptions options) {
PCollection<String> input = p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(options.getOutputFile());
TextIO.Write write = TextIO.write().to(resource);
input.apply("WriteLines", write);
}
}
次のように実行します。
spark-submit test --master yarn --deploy-mode cluster --class com.mycompany.beam.hdfsIOIssue.WritingToHDFS my-project-bundled-0.1-SNAPSHOT.jar --runner=SparkRunner --inputFile=testInput --outputFile=hdfs://testOutput
予想されること: ローカルの testInput ファイルの行を読み取り、hdfs ホーム ディレクトリにある testOutput という名前の新しいファイルに書き込みます。
実際に何が起こるか: 私が知る限り、何もありません。Spark はジョブが正常に完了したことを示し、ログに Beam の手順が表示されますが、hdfs またはローカル ディレクトリに書き込まれた testOutput という名前のファイルまたはディレクトリはありません。おそらく、spark executor ノードでローカルに書き込まれているのかもしれませんが、チェックするためのアクセス権がありません。
TextIO インターフェイスを間違って使用しているか、PipelineOptions インターフェイスに追加するだけでなく、ファイルシステムを構成するためにさらに多くのことを行う必要があると推測しています。しかし、その方法を説明するドキュメントが見つかりません。