3

TL;DR:

私の非常に単純な Spark Streaming アプリケーションは、ドライバーで「KafkaException: 文字列が最大サイズを超えています」で失敗します。エグゼキューターにも同じ例外が表示されますが、エグゼキューターのログのどこかで、他の情報が含まれていない IllegalArgumentException も見つかりました

完全な問題:

Spark Streaming を使用して、Kafka トピックからいくつかのメッセージを読み取ります。これは私がやっていることです:

val conf = new SparkConf().setAppName("testName")
val streamingContext = new StreamingContext(new SparkContext(conf), Milliseconds(millis))
val kafkaParams = Map(
      "metadata.broker.list" -> "somevalidaddresshere:9092",
      "auto.offset.reset" -> "largest"
    )
val topics = Set("data")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      streamingContext,
      kafkaParams,
      topics
    ).map(_._2) // only need the values not the keys

私が Kafka データで行っているのは、次を使用して印刷することだけです。

stream.print()

私のアプリケーションには明らかにこれよりも多くのコードがありますが、問題を特定するために、コードから可能な限りすべてを取り除きました

このコードを YARN で実行しようとしています。これは私のスパーク送信行です:

./spark-submit --class com.somecompany.stream.MainStream --master yarn --deploy-mode cluster myjar.jar hdfs://some.hdfs.address.here/user/spark/streamconfig.properties

streamconfig.properties ファイルは単なる通常のプロパティ ファイルであり、ここでの問題とはおそらく無関係です。

アプリケーションを実行しようとすると、ドライバーで次の例外が発生してすぐに失敗します。

16/05/10 06:15:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, some.hdfs.address.here): kafka.common.KafkaException: String exceeds the maximum size of 32767.
    at kafka.api.ApiUtils$.shortStringLength(ApiUtils.scala:73)
    at kafka.api.TopicData$.headerSize(FetchResponse.scala:107)
    at kafka.api.TopicData.<init>(FetchResponse.scala:113)
    at kafka.api.TopicData$.readFrom(FetchResponse.scala:103)
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

自分のコードがスタック トレースに表示されない

エグゼキューターを調べると、ドライバーと同じ例外が見つかりましたが、深く埋もれているのは次の例外です。

16/05/10 06:40:47 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 8)
java.lang.IllegalArgumentException
    at java.nio.Buffer.limit(Buffer.java:275)
    at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38)
    at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100)
    at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at kafka.api.TopicData$.readFrom(FetchResponse.scala:98)
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

情報が含まれていないため、IllegalArgument が何であるかわかりません

私のYARNが使用しているSparkのバージョンは1.6.0です。また、pom に以前のバージョンではなく Spark 1.6.0 が含まれていることも確認しました。私のスコープは「提供」されています

まったく同じトピックからデータを手動で読み取りましたが、そこにあるデータは単なる JSON です。そこにあるデータは決して巨大ではありません。32767 よりも確実に小さいです。また、通常のコマンド ライン コンシューマを使用してこのデータを読み取ることができるので、これは奇妙です

残念ながら、この例外をグーグルで検索しても、有用な情報は得られませんでした

ここで何が問題なのかを正確に理解する方法について誰か考えがありますか?

前もって感謝します

4

1 に答える 1

2

たくさん掘り下げた後、問題が何であるかがわかったと思います。YARN (1.6.0-cdh5.7.0) で Spark を実行しています。Cloudera には新しい Kafka クライアント (0.9 バージョン) があり、以前のバージョンからプロトコル間の変更がありました。ただし、Kafka のバージョンは 0.8.2 です。

于 2016-12-13T13:08:55.840 に答える