1

Spark スタンドアロン クラスターで比較的単純な Spark SQL コマンドを実行しようとしています。

select a.name, b.name, s.score
from score s
inner join A a on a.id = s.a_id
inner join B b on b.id = s.b_id
where pmod(a.id, 3) != 3 and pmod(b.id, 3) != 0

テーブルサイズは以下の通り

A: 25,000
B: 2,500,000
score: 25,000,000

したがって、これから 25,000,000 行の結果が得られると予想されます。このクエリを Spark SQL で実行し、各行を処理したいと考えています。関連するスパークコードは次のとおりです

val sqlContext = new HiveContext(sc)
val sql = "<above SQL>"
sqlContext.sql(sql).first

このコマンドは、テーブル スコアのサイズが 200,000 の場合は正常に実行されますが、現在は実行されません。関連するログは次のとおりです

14/12/04 16:35:14 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
14/12/04 16:35:43 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
14/12/04 16:36:24 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
14/12/04 16:37:11 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
14/12/04 16:38:13 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
14/12/04 16:39:19 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
14/12/04 16:39:48 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
14/12/04 16:40:08 WARN MemoryStore: Not enough space to store block broadcast_12 in memory! Free memory is 1938057068 bytes.
14/12/04 16:40:08 WARN MemoryStore: Persisting block broadcast_12 to disk instead.
java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.sql.execution.BroadcastHashJoin.execute(joins.scala:431)
    at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:42)
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:111)
    at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
    at org.apache.spark.sql.SchemaRDD.take(SchemaRDD.scala:440)
    at org.apache.spark.sql.SchemaRDD.take(SchemaRDD.scala:103)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1092)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:20)
    at $iwC$$iwC$$iwC.<init>(<console>:25)
    at $iwC$$iwC.<init>(<console>:27)
    at $iwC.<init>(<console>:29)
    at <init>(<console>:31)
    at .<init>(<console>:35)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

私の最初の考えは、このタイムアウトを増やすことでしたが、ここに示すようにソースを再コンパイルしない限り、これは不可能に見えます。親ディレクトリにもいくつかの異なる結合が表示されますが、spark でこれらの他のタイプの結合を使用する方法がわかりません。

また、spark.executor.memory を 10g まで増やして、ディスクへの永続化に関する最初の警告を修正しようとしましたが、問題は解決しませんでした。

このクエリを実際に実行する方法を知っている人はいますか?

4

1 に答える 1