1

Hive テーブルにアクセスする BigInsights on Cloud 4.2 Enterprise で pyspark スクリプトを実行しようとしています。

まず、ハイブ テーブルを作成します。

[biadmin@bi4c-xxxxx-mastermanager ~]$ hive
hive> CREATE TABLE pokes (foo INT, bar STRING);
OK
Time taken: 2.147 seconds
hive> LOAD DATA LOCAL INPATH '/usr/iop/4.2.0.0/hive/doc/examples/files/kv1.txt' OVERWRITE INTO TABLE pokes;
Loading data to table default.pokes
Table default.pokes stats: [numFiles=1, numRows=0, totalSize=5812, rawDataSize=0]
OK
Time taken: 0.49 seconds
hive> 

次に、単純な pyspark スクリプトを作成します。

[biadmin@bi4c-xxxxxx-mastermanager ~]$ cat test_pokes.py
from pyspark import SparkContext

sc = SparkContext()

from pyspark.sql import HiveContext
hc = HiveContext(sc)

pokesRdd = hc.sql('select * from pokes')
print( pokesRdd.collect() )

私は実行しようとします:

[biadmin@bi4c-xxxxxx-mastermanager ~]$ spark-submit \
    --master yarn-cluster \
    --deploy-mode cluster \
    --jars /usr/iop/4.2.0.0/hive/lib/datanucleus-api-jdo-3.2.6.jar, \
           /usr/iop/4.2.0.0/hive/lib/datanucleus-core-3.2.10.jar, \
           /usr/iop/4.2.0.0/hive/lib/datanucleus-rdbms-3.2.9.jar \
    test_pokes.py

ただし、次のエラーが発生します。

Traceback (most recent call last):
  File "test_pokes.py", line 8, in <module>
    pokesRdd = hc.sql('select * from pokes')
  File "/disk6/local/usercache/biadmin/appcache/application_1477084339086_0481/container_e09_1477084339086_0481_01_000001/pyspark.zip/pyspark/sql/context.py", line 580, in sql
  File "/disk6/local/usercache/biadmin/appcache/application_1477084339086_0481/container_e09_1477084339086_0481_01_000001/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/disk6/local/usercache/biadmin/appcache/application_1477084339086_0481/container_e09_1477084339086_0481_01_000001/pyspark.zip/pyspark/sql/utils.py", line 51, in deco
pyspark.sql.utils.AnalysisException: u'Table not found: pokes; line 1 pos 14'
End of LogType:stdout

spark-submit スタンドアロンを実行すると、テーブルが正常に存在することがわかります。

[biadmin@bi4c-xxxxxx-mastermanager ~]$ spark-submit test_pokes.py
…
…
16/12/21 13:09:13 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 18962 bytes result sent to driver
16/12/21 13:09:13 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 168 ms on localhost (1/1)
16/12/21 13:09:13 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/12/21 13:09:13 INFO DAGScheduler: ResultStage 0 (collect at /home/biadmin/test_pokes.py:9) finished in 0.179 s
16/12/21 13:09:13 INFO DAGScheduler: Job 0 finished: collect at /home/biadmin/test_pokes.py:9, took 0.236558 s
[Row(foo=238, bar=u'val_238'), Row(foo=86, bar=u'val_86'), Row(foo=311, bar=u'val_311')
…
…

この問題に関連する私の以前の質問を参照してください: hive spark yarn-cluster job fails with: "ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory"

この質問は、他の質問と似ています: Spark は pyspark から Hive テーブルにアクセスできますが、 spark-submit からはアクセスできません。ただし、その質問とは異なり、私は HiveContext を使用しています。


更新: 最終的な解決策については、こちらを参照してください https://stackoverflow.com/a/41272260/1033422

4

3 に答える 3

2

このバグの影響を受けているようです: https://issues.apache.org/jira/browse/SPARK-15345



HDP-2.5.0.0 の Spark 1.6.2 および 2.0.0 で同様の問題が発生しまし
た。私の目標は、次の条件下で、Hive SQL クエリからデータフレームを作成することでした。

  • パイソンAPI、
  • cluster deploy-mode (エグゼキュータ ノードの 1 つで実行されるドライバ プログラム)
  • YARN を使用してエグゼキューター JVM を管理します (スタンドアロンの Spark マスター インスタンスの代わりに)。

最初のテストでは、次の結果が得られました。

  1. spark-submit --deploy-mode client --master local ...=> 作業中
  2. spark-submit --deploy-mode client --master yarn ... =>作業中
  3. spark-submit --deploy-mode cluster --master yarn .... =>機能しない

ケース #3 では、executor ノードの 1 つで実行されているドライバーがデータベースを見つけることができます。エラーは次のとおりです。

pyspark.sql.utils.AnalysisException: 'Table or view not found: `database_name`.`table_name`; line 1 pos 14'



上記のFokko Driesprongの答えは私にとってはうまくいきました。
以下にリストされているコマンドを使用すると、executor ノードで実行されているドライバーは、データベース内の Hive テーブルにアクセスできましたが、これはdefault:

$ /usr/hdp/current/spark2-client/bin/spark-submit \
--deploy-mode cluster --master yarn \
--files /usr/hdp/current/spark2-client/conf/hive-site.xml \
/path/to/python/code.py



Spark 1.6.2 と Spark 2.0.0 でテストするために使用した python コードは次のとおり です。

SPARK_VERSION=2      
APP_NAME = 'spark-sql-python-test_SV,' + str(SPARK_VERSION)



def spark1():
    from pyspark.sql import HiveContext
    from pyspark import SparkContext, SparkConf

    conf = SparkConf().setAppName(APP_NAME)
    sc = SparkContext(conf=conf)
    hc = HiveContext(sc)

    query = 'select * from database_name.table_name limit 5'
    df = hc.sql(query)
    printout(df)




def spark2():
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName(APP_NAME).enableHiveSupport().getOrCreate()
    query = 'select * from database_name.table_name limit 5'
    df = spark.sql(query)
    printout(df)




def printout(df):
    print('\n########################################################################')
    df.show()
    print(df.count())

    df_list = df.collect()
    print(df_list)
    print(df_list[0])
    print(df_list[1])
    print('########################################################################\n')




def main():
    if SPARK_VERSION == 1:
        spark1()
    elif SPARK_VERSION == 2:
        spark2()




if __name__ == '__main__':
    main()
于 2017-03-03T23:26:00.297 に答える