1

問題文の更新

spark 1.2.0 (Hadoop 2.4) を使用しています。HDFS のデータ ファイルを使用して SchemaRDD を定義し、HiveServer2 を介してこれらをテーブルとしてクエリできるようにしたいと考えています。saveAsTable の試行中に実行時例外が発生しました。続行する方法についてのガイダンスが必要です。

ソースコード:

package foo.bar

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark._
import org.apache.spark.sql.hive._

object HiveDemo {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Demo")
    val sc = new SparkContext(conf)

    // sc is an existing SparkContext.
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

    // Create an RDD
    val zipRDD = sc.textFile("/model-inputs/all_zip_state.csv")

    // The schema is encoded in a string
    val schemaString = "ODSMEMBERID,ZIPCODE,STATE,TEST_SUPPLIERID,ratio_death_readm_low,ratio_death_readm_high,regions"

    // Generate the schema based on the string of schema
    val schema =
      StructType(
        schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true)))

    // Convert records of the RDD (zip) to Rows.
    val rowRDD = zipRDD.map(_.split(",")).map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5), ""))

    // Apply the schema to the RDD.
    val zipSchemaRDD = hiveContext.applySchema(rowRDD, schema)

    // HiveContext's save as Table
    zipSchemaRDD.saveAsTable("allzipstable")

  }
}

spark-submit コマンド:

./bin/spark-submit  --class foo.bar.HiveDemo --master yarn-cluster --jars /usr/lib/hive/lib/hive-metastore.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 lib/datapipe_2.10-1.0.jar 10

ノードでの実行時の例外:

15/01/29 22:35:50 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
 LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MappedRDD[3] at map at HiveDemo.scala:30
)
Exception in thread "Driver" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
 LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MappedRDD[3] at map at HiveDemo.scala:30

    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:83)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
    at org.apache.spark.sql.SchemaRDDLike$class.saveAsTable(SchemaRDDLike.scala:126)
    at org.apache.spark.sql.SchemaRDD.saveAsTable(SchemaRDD.scala:108)
    at com.healthagen.datapipe.ahm.HiveDemo$.main(HiveDemo.scala:36)
    at com.healthagen.datapipe.ahm.HiveDemo.main(HiveDemo.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427)
15/01/29 22:35:50 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook

別の試み:

package foo.bar

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql._

case class AllZips(
  ODSMEMBERID: String,
  ZIPCODE: String,
  STATE: String,
  TEST_SUPPLIERID: String,
  ratio_death_readm_low: String,
  ratio_death_readm_high: String,
  regions: String)

object HiveDemo {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("HiveDemo")
    val sc = new SparkContext(conf)
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    import hiveContext._
    val allZips = sc.textFile("/model-inputs/all_zip_state.csv").map(_.split(",")).map(p => AllZips(p(0), p(1), p(2), p(3), p(4), p(5), ""))
    val allZipsSchemaRDD = createSchemaRDD(allZips)
    allZipsSchemaRDD.saveAsTable("allzipstable")
  }
}

ノードの例外:

15/01/30 00:28:19 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
 LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36
)
Exception in thread "Driver" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
 LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36

    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:83)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
    at org.apache.spark.sql.SchemaRDDLike$class.saveAsTable(SchemaRDDLike.scala:126)
    at org.apache.spark.sql.SchemaRDD.saveAsTable(SchemaRDD.scala:108)
    at com.healthagen.datapipe.ahm.HiveDemo$.main(HiveDemo.scala:22)
    at com.healthagen.datapipe.ahm.HiveDemo.main(HiveDemo.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427)
15/01/30 00:28:19 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook
4

2 に答える 2

1

HiveContextを使用する必要があります

Java/scala ドキュメントは次のとおりです。

   * Note that this currently only works with SchemaRDDs that are created from a HiveContext as
   * there is no notion of a persisted catalog in a standard SQL context. 


  @Experimental
  def saveAsTable(tableName: String): Unit =
    sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan, false)).toRdd

したがって、コードで次のように変更します。

val sc = new HiveContext(conf)

実際には、名前を変更する必要があります

val sqlc = new HiveContext(conf)

参考までに: テーブルの登録に関する詳細情報 (SQLContext 内): この方法で行うと、テーブルは一時的なものになることに注意してください。

  /**
   *  Temporary tables exist only
   * during the lifetime of this instance of SQLContext.
   *
   * @group userf
   */
  def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
    catalog.registerTable(Seq(tableName), rdd.queryExecution.logical)
  }

更新 新しいスタックトレースには、次のフレーズが含まれています。

Unresolved plan found, tree:

これは通常、基になるテーブルと一致しない列があることを意味します。隔離できるかどうかをさらに調べますが、それまでの間、その観点から検討することもできます.

于 2015-01-29T23:42:31.757 に答える
0

上記の createSchemaRDD コード スニペットは、spark 1.2.1 で正常に動作します。

1.2.0 に CTAS の欠陥がありました

于 2015-02-14T16:55:49.393 に答える