2

関数 - parse_urlは、spark-sql throw sql-client (thrift サーバー経由)、IPython、pyspark-shell を使用する場合は常に正常に動作しますが、spark-submitモードをスローすると動作しません。

/opt/spark/bin/spark-submit --driver-memory 4G --executor-memory 8G main.py

エラーは次のとおりです。

Traceback (most recent call last):
  File "/home/spark/***/main.py", line 167, in <module>
    )v on registrations.ga = v.ga and reg_path = oldtrack_page and registration_day = day_cl_log  and date_cl_log <= registration_date""")
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 552, in sql
  File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 40, in deco
pyspark.sql.utils.AnalysisException: undefined function parse_url;
Build step 'Execute shell' marked build as failure
Finished: FAILURE

したがって、ここでは回避策を使用しています。

def python_parse_url(url, que, key):
    import urlparse
    ians = None
    if que == "QUERY":
        ians = urlparse.parse_qs(urlparse.urlparse(url).query)[key][0]
    elif que == "HOST":
        ians = urlparse.urlparse(url).hostname
    elif que == "PATH":
        ians = urlparse.urlparse(url).path
    return ians

def dc_python_parse_url(url, que, key):
    ians = None
    try:
        ians = python_parse_url(url, que, key)
    except:
        pass
    return ians

sqlCtx.registerFunction('my_parse_url', dc_python_parse_url)

この問題について何か助けてください。

4

1 に答える 1

9

火花 >= 2.0

以下と同じですがSparkSession、Hive サポートを有効にして使用します。

SparkSession.builder.enableHiveSupport().getOrCreate()

火花 < 2.0

parse_url古典的な SQL 関数ではありません。これは Hive UDF であるため、機能するには次のものが必要HiveContextです。

from pyspark import SparkContext
from pyspark.sql import HiveContext, SQLContext

sc = SparkContext()

sqlContext = SQLContext(sc)
hivContext = HiveContext(sc)

query = """SELECT parse_url('http://example.com/foo/bar?foo=bar', 'HOST')"""

sqlContext.sql(query)
## Py4JJavaError                             Traceback (most recent call last)
##   ...
## AnalysisException: 'undefined function parse_url;'

hivContext.sql(query)
## DataFrame[_c0: string]
于 2016-04-04T18:57:34.110 に答える