ユーザー入力として SQL 文字列を取得し、実行前に変換したいと考えています。特に、最上位のプロジェクション (select 句) を変更して、クエリによって取得される追加の列を挿入したいと考えています。
を使用して Catalyst に接続することで、これを実現したいと考えていましたsparkSession.experimental.extraOptimizations
。私が試みていることは、厳密に言えば最適化 (変換によって SQL ステートメントのセマンティクスが変更される) ではないことはわかっていますが、それでも API は適しているようです。ただし、私の変換はクエリ実行プログラムによって無視されているようです。
これは、私が抱えている問題を説明するための最小限の例です。最初に行ケース クラスを定義します。
case class TestRow(a: Int, b: Int, c: Int)
次に、プロジェクションを単純に破棄する最適化ルールを定義します。
object RemoveProjectOptimisationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case x: Project => x.child
}
}
データセットを作成し、最適化を登録して、SQL クエリを実行します。
// Create a dataset and register table.
val dataset = List(TestRow(1, 2, 3)).toDS()
val tableName: String = "testtable"
dataset.createOrReplaceTempView(tableName)
// Register "optimisation".
sparkSession.experimental.extraOptimizations =
Seq(RemoveProjectOptimisationRule)
// Run query.
val projected = sqlContext.sql("SELECT a FROM " + tableName + " WHERE a = 1")
// Print query result and the queryExecution object.
println("Query result:")
projected.collect.foreach(println)
println(projected.queryExecution)
出力は次のとおりです。
Query result:
[1]
== Parsed Logical Plan ==
'Project ['a]
+- 'Filter ('a = 1)
+- 'UnresolvedRelation `testtable`
== Analyzed Logical Plan ==
a: int
Project [a#3]
+- Filter (a#3 = 1)
+- SubqueryAlias testtable
+- LocalRelation [a#3, b#4, c#5]
== Optimized Logical Plan ==
Filter (a#3 = 1)
+- LocalRelation [a#3, b#4, c#5]
== Physical Plan ==
*Filter (a#3 = 1)
+- LocalTableScan [a#3, b#4, c#5]
結果は、変換が適用されていない元の SQL ステートメントの結果と同じであることがわかります。それでも、論理的および物理的な計画を印刷するとき、投影は実際に削除されています。また、(デバッグ ログ出力を通じて) 変換が実際に呼び出されていることを確認しました。
ここで何が起こっているかについて何か提案はありますか? オプティマイザーは、セマンティクスを変更する「最適化」を単純に無視するのでしょうか?
最適化の使用が適切でない場合、誰か代替案を提案できますか? 私が実際にやりたいことは、入力 SQL ステートメントを解析して変換し、変換された AST を実行のために Spark に渡すことだけです。しかし、私が見る限り、これを行うための API は Sparksql
パッケージ専用です。リフレクションを使用することは可能かもしれませんが、それは避けたいと思います。
どんなポインタでも大歓迎です。