4

サメ/スパーク wiki の開発部分は非常に短いので、テーブルをプログラムでクエリするためのコードをまとめてみました。ここにあります ...

object Test extends App {
  val master = "spark://localhost.localdomain:8084"
  val jobName = "scratch"

  val sparkHome = "/home/shengc/Downloads/software/spark-0.6.1"
  val executorEnvVars = Map[String, String](
    "SPARK_MEM" -> "1g",
    "SPARK_CLASSPATH" -> "",
    "HADOOP_HOME" -> "/home/shengc/Downloads/software/hadoop-0.20.205.0",
    "JAVA_HOME" -> "/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64",
    "HIVE_HOME" -> "/home/shengc/Downloads/software/hive-0.9.0-bin"
  )

  val sc = new shark.SharkContext(master, jobName, sparkHome, Nil, executorEnvVars) 
  sc.sql2console("create table src")
  sc.sql2console("load data local inpath '/home/shengc/Downloads/software/hive-0.9.0-bin/examples/files/kv1.txt' into table src")
  sc.sql2console("select count(1) from src")
}

テーブル src を作成し、データを src に正常にロードできますが、最後のクエリで NPE がスローされて失敗しました。出力は次のとおりです...

13/01/06 17:33:20 INFO execution.SparkTask: Executing shark.execution.SparkTask
13/01/06 17:33:20 INFO shark.SharkEnv: Initializing SharkEnv
13/01/06 17:33:20 INFO execution.SparkTask: Adding jar file:///home/shengc/workspace/shark/hive/lib/hive-builtins-0.9.0.jar
java.lang.NullPointerException
    at shark.execution.SparkTask$$anonfun$execute$5.apply(SparkTask.scala:58)
    at shark.execution.SparkTask$$anonfun$execute$5.apply(SparkTask.scala:55)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
    at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:38)
    at shark.execution.SparkTask.execute(SparkTask.scala:55)
    at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:134)
    at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:57)
    at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1326)
    at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1118)
    at org.apache.hadoop.hive.ql.Driver.run(Driver.java:951)
    at shark.SharkContext.sql(SharkContext.scala:58)
    at shark.SharkContext.sql2console(SharkContext.scala:84)
    at Test$delayedInit$body.apply(Test.scala:20)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:60)
    at scala.App$$anonfun$main$1.apply(App.scala:60)
    at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
    at scala.collection.immutable.List.foreach(List.scala:76)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:30)
    at scala.App$class.main(App.scala:60)
    at Test$.main(Test.scala:4)
    at Test.main(Test.scala)
FAILED: Execution Error, return code -101 from shark.execution.SparkTask13/01/06 17:33:20 ERROR ql.Driver: FAILED: Execution Error, return code -101 from shark.execution.SparkTask
13/01/06 17:33:20 INFO ql.Driver: </PERFLOG method=Driver.execute start=1357511600030 end=1357511600054 duration=24>
13/01/06 17:33:20 INFO ql.Driver: <PERFLOG method=releaseLocks>
13/01/06 17:33:20 INFO ql.Driver: </PERFLOG method=releaseLocks start=1357511600054 end=1357511600054 duration=0>

ただし、bin/shark-withinfo によって呼び出されるシェル内で select * from src と入力して、src テーブルをクエリできます。

「bin/shark-shell」によってトリガーされたシェルでそのSQLを試してみませんか?ええと、私はその殻に入ることはできません。これが私が遭遇したエラーです...

https://groups.google.com/forum/?fromgroups=#!topic/shark-users/glZzrUfabGc

[編集 1]: この NPE は、SharkENV.sc が設定されていないために発生しているように見えるため、追加しました

shark.SharkEnv.sc = sc

sql2console 操作が実行される直前。その後、scala.tools.nsc の ClassNotFoundException が発生したため、手動で scala-compiler をクラスパスに追加しました。その後、コードは別の ClassNotFoundException を訴えました。クラスパスにサメの瓶を入れたので、修正方法がわかりません。

13/01/06 18:09:34 INFO cluster.TaskSetManager: Lost TID 1 (task 1.0:1)
13/01/06 18:09:34 INFO cluster.TaskSetManager: Loss was due to java.lang.ClassNotFoundException: shark.execution.TableScanOperator$$anonfun$preprocessRdd$3
    at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)

[編集 2]: わかりました。インタラクティブな repl を初期化する方法のサメのソース コードを正確に従うことによって、私が望むものを満たすことができる別のコードを見つけました。

System.setProperty("MASTER", "spark://localhost.localdomain:8084")
System.setProperty("SPARK_MEM", "1g")
System.setProperty("SPARK_CLASSPATH", "")
System.setProperty("HADOOP_HOME", "/home/shengc/Downloads/software/hadoop-0.20.205.0")
System.setProperty("JAVA_HOME", "/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64")
System.setProperty("HIVE_HOME", "/home/shengc/Downloads/software/hive-0.9.0-bin")
System.setProperty("SCALA_HOME", "/home/shengc/Downloads/software/scala-2.9.2")

shark.SharkEnv.initWithSharkContext("scratch")
val sc = shark.SharkEnv.sc.asInstanceOf[shark.SharkContext]

sc.sql2console("select * from src")

これは醜いですが、少なくとも機能します。より堅牢なコードを書く方法についてのコメントは大歓迎です!!

プログラムでサメを操作したい人は、すべてのハイブとサメの jar が CLASSPATH にある必要があり、scala コンパイラもクラスパスにある必要があることに注意してください。もう 1 つの重要な点は、hadoop の conf もクラスパスにある必要があることです。

4

1 に答える 1