1

Apache Flink を使用してプログラムを作成しているときに行き詰まりました。問題は、計算の結果としてHadoop の MapFileを生成しようとしているのに、Scala コンパイラが型の不一致について不平を言うことです。

問題を説明するために、2 種類の出力を生成しようとする以下のコード スニペットを示します。1 つはHadoop の SequenceFileで、もう 1 つは MapFile です。

val dataSet: DataSet[(IntWritable, BytesWritable)] =
  env.readSequenceFile(classOf[Text], classOf[BytesWritable], inputSequenceFile.toString)
    .map(mapper(_))
    .partitionCustom(partitioner, 0)
    .sortPartition(0, Order.ASCENDING)

val seqOF = new HadoopOutputFormat(
  new SequenceFileOutputFormat[IntWritable, BytesWritable](), Job.getInstance(hadoopConf)
)

val mapfileOF = new HadoopOutputFormat(
  new MapFileOutputFormat(), Job.getInstance(hadoopConf)
)

val dataSink1 = dataSet.output(seqOF)  // it typechecks!
val dataSink2 = dataSet.output(mapfileOF) // syntax error

上でコメントしたように、dataSet.output(mapfileOF) により、Scala コンパイラは次のように文句を言います: ここに画像の説明を入力 参考までに、SequenceFile と比較して、MapFile は、キーが WritableComparable でなければならないというより強い条件を呼び出します。

Flink を使用してアプリケーションを作成する前に、以下のように Spark を使用して実装しましたが、正常に動作しました (コンパイル エラーは発生せず、エラーも発生せずに正常に実行されます)。

val rdd = sc
  .sequenceFile(inputSequenceFile.toString, classOf[Text], classOf[BytesWritable])
  .map(mapper(_))
  .repartitionAndSortWithinPartitions(partitioner)

rdd.saveAsNewAPIHadoopFile(
  outputPath.toString,
  classOf[IntWritable],
  classOf[BytesWritable],
  classOf[MapFileOutputFormat]
)  
4

1 に答える 1

0

確認しましたか: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/hadoop_compatibility.html#using-hadoop-outputformats

次の例が含まれています。

// Obtain your result to emit.
val hadoopResult: DataSet[(Text, IntWritable)] = [...]

val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
  new TextOutputFormat[Text, IntWritable],
  new JobConf)

hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))

hadoopResult.output(hadoopOF)
于 2016-06-02T11:29:42.510 に答える