1

スパーク構造ストリーミングを使用する簡単なテスト ケースを作成しようとしています。コードはgithubのholdenkに触発されています。

これがCustomSinkコードです

case class CustomSink(func: DataFrame => Unit)
  extends Sink {

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    func(data)
  }
}

class CustomSinkProvider extends StreamSinkProvider {
  def func(df: DataFrame) {
    df.show(5)
  }

  def createSink(
                  sqlContext: SQLContext,
                  parameters: Map[String, String],
                  partitionColumns: Seq[String],
                  outputMode: OutputMode): CustomSink = {
    new CustomSink(func)
  }
}

MemoryStream を使用してテストケースで実行しようとしています

@Test
def demoCustomSink: Unit = {
  val input = MemoryStream[String]
  val doubled = input.toDS().map(x => x + " " + x)

  // input.addData("init")

  val query = doubled.writeStream
    .queryName("testCustomSinkBasic")
    .format("com.knockdata.spark.highcharts.demo.CustomSinkProvider")
    .start()

  input.addData("hi")

  query.processAllAvailable()
}

行がない場合はエラーを報告しますinput.addData("init")

2016-10-12 03:48:37 ERROR StreamExecution       :91 - Query testCustomSinkBasic terminated with error
java.lang.RuntimeException: No data selected!
  at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.streaming.MemoryStream$$anonfun$getBatch$4.apply(memory.scala:110)
at org.apache.spark.sql.execution.streaming.MemoryStream$$anonfun$getBatch$4.apply(memory.scala:110)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.MemoryStream.getBatch(memory.scala:109)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:332)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:329)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:329)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:194)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:184)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:120)

initラインを追加するとシンクに届かないinput.addData("init")

行のコメントを外すと、エラーを報告せずにテスト ケースを正常に実行できますinput.addData("init")

しかし、値initはシンクに届きません。値のみhi hiが表示されます。

なぜ、どうすれば解決できますか?

4

1 に答える 1

0

バックグラウンドでチェックポイント メカニズムがあります。チェックポイントディレクトリにデータがあるとうまくいきません。

次のコードを使用して作成し、ヘルパー メソッドを使用してディレクトリをクリアします。

val checkpointPath = Files.createTempDirectory("query")
val checkpointDir = checkpointPath.toFile

checkpointDir.deleteOnExit()

def deleteRecursively(file: java.io.File): Unit = {
  if (file.isDirectory) {
    file.listFiles().foreach(deleteRecursively)
    file.delete()
  }
  else
    file.delete()
}

def clearCheckpointDir: Unit = {
  checkpointDir.listFiles().foreach(deleteRecursively)
}

lazy val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.sql.streaming.checkpointLocation",
    checkpointDir.getAbsolutePath)
  .master("local[*]")
  .appName("test")
  .getOrCreate()

次に、テスト ケースに次のコードを追加し、カスタム シンクが期待どおりに動作するようにしました。

@Before
def before: Unit = {
  clearCheckpointDir()
}
于 2016-10-12T20:18:23.543 に答える