Embedded Kafka を使用して統合テストを作成しようとしていますが、NullPointerException が発生し続けます。私のテストケースは非常に単純です。次の手順があります。
- JSON ファイルを読み取り、inputTopic にメッセージを書き込みます。
- 「readStream」操作を実行します。
- ストリームで「選択」を行います。これにより、NullPointerException がスローされます。
私は何を間違っていますか?コードを以下に示します。
"My Test which runs with Embedded Kafka" should "Generate correct Result" in {
implicit val config: EmbeddedKafkaConfig =
EmbeddedKafkaConfig(
kafkaPort = 9066,
zooKeeperPort = 2066,
Map("log.dir" -> "./src/test/resources/")
)
withRunningKafka {
createCustomTopic(inputTopic)
val source = Source.fromFile("src/test/resources/test1.json")
source.getLines.toList.filterNot(_.isEmpty).foreach(
line => publishStringMessageToKafka(inputTopic, line)
)
source.close()
implicit val deserializer: StringDeserializer = new StringDeserializer
createCustomTopic(outputTopic)
import spark2.implicits._
val schema = spark.read.json("my.json").schema
val myStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9066")
.option("subscribe", inputTopic)
.load()
// Schema looks good
myStream.printSchema()
// Following line throws NULLPointerException! Why?
val df = myStream.select(from_json($"value".cast("string"), schema).alias("value"))
// There's more code... but let's not worry about that for now.
}
}
要求どおり..ここにStackTraceがあります:
at net.manub.embeddedkafka.EmbeddedKafkaSupport$$anonfun$withRunningKafka$1$$anonfun$apply$1.apply(EmbeddedKafka.scala:220)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$$anonfun$withRunningKafka$1$$anonfun$apply$1.apply(EmbeddedKafka.scala:213)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$class.withTempDir(EmbeddedKafka.scala:279)
at com.MyTestClass.withTempDir(MyTestClass.scala:18)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$$anonfun$withRunningKafka$1.apply(EmbeddedKafka.scala:213)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$$anonfun$withRunningKafka$1.apply(EmbeddedKafka.scala:212)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$$anonfun$withRunningZooKeeper$1.apply(EmbeddedKafka.scala:268)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$$anonfun$withRunningZooKeeper$1.apply(EmbeddedKafka.scala:265)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$class.withTempDir(EmbeddedKafka.scala:279)
at com.MyTestClass.withTempDir(MyTestClass.scala:18)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$class.withRunningZooKeeper(EmbeddedKafka.scala:265)
at com.MyTestClass.withRunningZooKeeper(MyTestClass.scala:18)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$class.withRunningKafka(EmbeddedKafka.scala:212)
at com.MyTestClass.withRunningKafka(MyTestClass.scala:18)
at com.MyTestClass$$anonfun$1.apply$mcV$sp(MyTestClass.scala:47)
at com.MyTestClass$$anonfun$1.apply(MyTestClass.scala:38)
at com.MyTestClass$$anonfun$1.apply(MyTestClass.scala:38)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1682)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1685)
at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1679)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1692)
at com.MyTestClass.org$scalatest$BeforeAndAfter$$super$runTest(MyTestClass.scala:18)
at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:203)
at com.MyTestClass.runTest(MyTestClass.scala:18)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:393)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:381)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:370)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:407)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:381)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:376)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1750)
at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1685)
at org.scalatest.Suite$class.run(Suite.scala:1124)
at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1685)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1795)
at com.MyTestClass.org$scalatest$BeforeAndAfterAll$$super$run(MyTestClass.scala:18)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
at com.MyTestClass.org$scalatest$BeforeAndAfter$$super$run(MyTestClass.scala:18)
at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:258)
at com.MyTestClass.run(MyTestClass.scala:18)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1349)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1343)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1012)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)