Apache Flink を使用してプログラムを作成しているときに行き詰まりました。問題は、計算の結果としてHadoop の MapFileを生成しようとしているのに、Scala コンパイラが型の不一致について不平を言うことです。
問題を説明するために、2 種類の出力を生成しようとする以下のコード スニペットを示します。1 つはHadoop の SequenceFileで、もう 1 つは MapFile です。
val dataSet: DataSet[(IntWritable, BytesWritable)] =
env.readSequenceFile(classOf[Text], classOf[BytesWritable], inputSequenceFile.toString)
.map(mapper(_))
.partitionCustom(partitioner, 0)
.sortPartition(0, Order.ASCENDING)
val seqOF = new HadoopOutputFormat(
new SequenceFileOutputFormat[IntWritable, BytesWritable](), Job.getInstance(hadoopConf)
)
val mapfileOF = new HadoopOutputFormat(
new MapFileOutputFormat(), Job.getInstance(hadoopConf)
)
val dataSink1 = dataSet.output(seqOF) // it typechecks!
val dataSink2 = dataSet.output(mapfileOF) // syntax error
上でコメントしたように、dataSet.output(mapfileOF) により、Scala コンパイラは次のように文句を言います:
参考までに、SequenceFile と比較して、MapFile は、キーが WritableComparable でなければならないというより強い条件を呼び出します。
Flink を使用してアプリケーションを作成する前に、以下のように Spark を使用して実装しましたが、正常に動作しました (コンパイル エラーは発生せず、エラーも発生せずに正常に実行されます)。
val rdd = sc
.sequenceFile(inputSequenceFile.toString, classOf[Text], classOf[BytesWritable])
.map(mapper(_))
.repartitionAndSortWithinPartitions(partitioner)
rdd.saveAsNewAPIHadoopFile(
outputPath.toString,
classOf[IntWritable],
classOf[BytesWritable],
classOf[MapFileOutputFormat]
)