1

ユーザー入力として SQL 文字列を取得し、実行前に変換したいと考えています。特に、最上位のプロジェクション (select 句) を変更して、クエリによって取得される追加の列を挿入したいと考えています。

を使用して Catalyst に接続することで、これを実現したいと考えていましたsparkSession.experimental.extraOptimizations。私が試みていることは、厳密に言えば最適化 (変換によって SQL ステートメントのセマンティクスが変更される) ではないことはわかっていますが、それでも API は適しているようです。ただし、私の変換はクエリ実行プログラムによって無視されているようです。

これは、私が抱えている問題を説明するための最小限の例です。最初に行ケース クラスを定義します。

case class TestRow(a: Int, b: Int, c: Int)

次に、プロジェクションを単純に破棄する最適化ルールを定義します。

object RemoveProjectOptimisationRule extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
        case x: Project => x.child
    }
}

データセットを作成し、最適化を登録して、SQL クエリを実行します。

// Create a dataset and register table.
val dataset = List(TestRow(1, 2, 3)).toDS()
val tableName: String = "testtable"
dataset.createOrReplaceTempView(tableName)

// Register "optimisation".
sparkSession.experimental.extraOptimizations =  
    Seq(RemoveProjectOptimisationRule)

// Run query.
val projected = sqlContext.sql("SELECT a FROM " + tableName + " WHERE a = 1")

// Print query result and the queryExecution object.
println("Query result:")
projected.collect.foreach(println)
println(projected.queryExecution)

出力は次のとおりです。

Query result: 
[1]

== Parsed Logical Plan ==
'Project ['a]
+- 'Filter ('a = 1)
   +- 'UnresolvedRelation `testtable`

== Analyzed Logical Plan ==
a: int
Project [a#3]
+- Filter (a#3 = 1)
   +- SubqueryAlias testtable
      +- LocalRelation [a#3, b#4, c#5]

== Optimized Logical Plan ==
Filter (a#3 = 1)
+- LocalRelation [a#3, b#4, c#5]

== Physical Plan ==
*Filter (a#3 = 1)
+- LocalTableScan [a#3, b#4, c#5]

結果は、変換が適用されていない元の SQL ステートメントの結果と同じであることがわかります。それでも、論理的および物理的な計画を印刷するとき、投影は実際に削除されています。また、(デバッグ ログ出力を通じて) 変換が実際に呼び出されていることを確認しました。

ここで何が起こっているかについて何か提案はありますか? オプティマイザーは、セマンティクスを変更する「最適化」を単純に無視するのでしょうか?

最適化の使用が適切でない場合、誰か代替案を提案できますか? 私が実際にやりたいことは、入力 SQL ステートメントを解析して変換し、変換された AST を実行のために Spark に渡すことだけです。しかし、私が見る限り、これを行うための API は Sparksqlパッケージ専用です。リフレクションを使用することは可能かもしれませんが、それは避けたいと思います。

どんなポインタでも大歓迎です。

4

2 に答える 2

4

ご想像のとおり、オプティマイザーがクエリの結果を変更しないと仮定しているため、これは機能しません。

具体的には、アナライザーから出力されるスキーマをキャッシュします (オプティマイザーによって変更されないと仮定します)。行を外部形式に変換する場合、このスキーマを使用するため、結果の列が切り捨てられます。切り捨て以上のことをした場合 (つまり、データ型を変更した場合)、これはクラッシュすることさえあります。

このノートブックでわかるように、実際には、内部で期待される結果が得られています。近い将来、クエリ実行の他のフェーズでプランを変更できるフックをさらにオープンにする予定です。詳細については、 SPARK-18127を参照してください。

于 2016-10-26T23:52:31.820 に答える