3

py4jを使用してpysparkでJava関数を実行しようとしていました。Py4j を使用すると、JVM で Java オブジェクトにアクセスできます。JVM の別のインスタンスを作成し、Java 関数を正常に実行できました。

py4j は、GatewayServer インスタンスを介してこの通信を有効にします。

どうにかして Spark の内部 JVM にアクセスして Java 関数を実行できるかどうか疑問に思っていました。spark の py4j Gatewayserver のエントリ ポイントは何ですか? 関数をエントリ ポイントに追加するにはどうすればよいですか?

4

2 に答える 2

4

これが必要かどうかはわかりませんが、私が見た場所が2つあります。

sc._gateway.jvm

これは java_import に、または直接使用できます

sc._jvm

したがって、パッケージ abc のクラス X にアクセスするには、次のいずれかを実行できます。

jvm = sc._gateway.jvm
java_import(jvm,"a.b.c.X")
instance = a.b.c.X()

またはより直接:

instance = sc._jvm.a.b.c.X()

Java 関数を追加するには、それがクラスパスにあることを確認する必要があります。それをワーカー (UDF など) で使用する場合は、それをワーカーに送信する必要があります。これを実現するには、 --driver-class-path スイッチを使用して spark-submit (または pyspark) をドライバーに追加し、 --jars をワーカーに送信します。

于 2016-03-10T11:28:07.567 に答える
1

見る

$SPARK_HOME/python/pyspark/java_gateway.py

Java/Scala バックエンドとのインターフェースに使用されるメカニズムが表示されます。

次に示すように、1 つ以上の Java ファイルを更新する必要があります。

java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
# TODO(davies): move into sql
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")

これらはSpark-Javaエントリ ポイントを表します。

PysparkSpark-JavaScala に直接アクセスする代わりに、エントリ ポイントを使用します。(a) これらの API クラスで既存のものを使用するか、(b) それらのクラスに新しいエントリ ポイントを追加して独自のバージョンの Spark を構築する必要があります。

于 2016-03-03T18:03:38.197 に答える