0

Flink で RollingSink を使用して、AVRO にシリアル化されたケース クラスを HDFS に書き込もうとしています。HDFS で avro ファイルをデシリアライズ可能にするために、FSDataOutputStream をラップする DataFileWriter を使用します。HDFS のデータ ファイルを閉じるために DataFileWriter と FSDataOutputStream を同期しようとすると、例外がスローされ、実際には他のすべてのファイルでデータが取得されます。Flink Writer 実装で fs ストリームを Avro ライターと同期する方法はありますか?

DataFileWriter close() flush() sync() fsync() を使用しようとしましたが、すべて失敗しました。同期方法が最適に機能するようです。また、動作しているように見える書き込み方法で同期を試みましたが、それでもエラーが発生し、すべてのデータがファイルに保存されているかどうかを確認できませんでした。

class AvroWriter[OutputContainer <: org.apache.avro.specific.SpecificRecordBase] extends Writer[OutputContainer] {

  val serialVersionUID = 1L

  var outputStream: FSDataOutputStream = null
  var outputWriter: DataFileWriter[OutputContainer] = null

  override def open(outStream: FSDataOutputStream): Unit = {
    if (outputStream != null) {
      throw new IllegalStateException("AvroWriter has already been opened.")
    }
    outputStream = outStream

    if(outputWriter == null) {
      val writer: DatumWriter[OutputContainer] = new SpecificDatumWriter[OutputContainer](OutputContainer.SCHEMA$)
      outputWriter = new DataFileWriter[OutputContainer](writer)
      outputWriter.create(OutputContainer.SCHEMA$, outStream)
    }
  }

  override def flush(): Unit = {}

  override def close(): Unit = {
    if(outputWriter != null) {
      outputWriter.sync()
    }
    outputStream = null
  }

  override def write(element: OutputContainer) = {
    if (outputStream == null) {
      throw new IllegalStateException("AvroWriter has not been opened.")
    }
    outputWriter.append(element)
  }

  override def duplicate(): AvroWriter[OutputContainer] = {
    new AvroWriter[OutputContainer]
  }
}

上記のコードで RollingSink を実行しようとすると、次の例外が発生します。

java.lang.Exception: Could not forward element to next operator
        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
        at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:158)
        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:664)
Caused by: java.nio.channels.ClosedChannelException
        at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1353)
        at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:98)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:446)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121)
        at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
        at org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
        at org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:366)
        at org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:383)
        at org.apache.avro.file.DataFileWriter.sync(DataFileWriter.java:401)
        at pl.neptis.FlinkKafkaConsumer.utils.AvroWriter.close(AvroWriter.scala:36)
        at org.apache.flink.streaming.connectors.fs.RollingSink.closeCurrentPartFile(RollingSink.java:476)
        at org.apache.flink.streaming.connectors.fs.RollingSink.openNewPartFile(RollingSink.java:419)
        at org.apache.flink.streaming.connectors.fs.RollingSink.invoke(RollingSink.java:373)
        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
        ... 3 more
4

1 に答える 1