4

次のエラーが表示されます。

Py4JError(u'o73.createDirectStreamWithoutMessageHandler の呼び出し中にエラーが発生しました。トレース:\npy4j.Py4JException: メソッド createDirectStreamWithoutMessageHandler([class org.apache.spark.streaming.api.java.JavaStreamingContext、class java.util.HashMap、class java.util .HashSet、クラス java.util.HashMap]) は存在しません\n\tat py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)\n\tat py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344) \n\tat py4j.Gateway.invoke(Gateway.java:252)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java: 79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\n\n',)

spark-streaming-kafka-assembly_2.10-1.6.0.jar を使用しています (これは、すべてのノード + マスターの /usr/lib/hadoop/lib/ フォルダーにあります)

(編集) 実際のエラーは次のとおりです: java.lang.NoSuchMethodError: org.apache.hadoop.yarn.util.Apps.crossPlatformify(Ljava/lang/String;)Ljava/lang/String;

これは、hadoop のバージョンが間違っていたことが原因でした。したがって、spark は正しい Hadoop バージョンでコンパイルする必要があります。

mvn -Phadoop-2.6 -Dhadoop.version=2.7.2 -DskipTests clean package

これにより、external/kafka-assembly/target フォルダーに jar が作成されます。

4

1 に答える 1

1

イメージ バージョン 1 を使用して、pyspark ストリーミング / kafka の例の wordcountを正常に実行しました

これらの例のそれぞれで、「ad-kafka-inst」は「テスト」トピックを持つ私のテスト kafka インスタンスです。

  1. 初期化アクションなしでクラスターを使用する:

    $ gcloud dataproc jobs submit pyspark --cluster ad-kafka2 --properties spark.jars.packages=org.apache.spark:spark-streaming-kafka_2.10:1.6.0 ./kafka_wordcount.py ad-kafka-inst:2181 test 
    
  2. 完全な kafka アセンブリでの初期化アクションの使用:

    • ダウンロード/解凍 spark-1.6.0.tgz
    • ビルド:

      $ mvn -Phadoop-2.6 -Dhadoop.version=2.7.2 package
      
    • spark-streaming-kafka-assembly_2.10-1.6.0.jar を新しい GCS バケット (MYBUCKET など) にアップロードします。
    • 同じ GCS バケットに次の初期化アクションを作成します (例: gs://MYBUCKET/install_spark_kafka.sh)。

      $ #!/bin/bash
      
      gsutil cp gs://MY_BUCKET/spark-streaming-kafka-assembly_2.10-1.6.0.jar /usr/lib/hadoop/lib/
      chmod 755 /usr/lib/hadoop/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar 
      
    • 上記の初期化アクションでクラスターを開始します。

      $ gcloud dataproc clusters create ad-kafka-init --initialization-actions gs://MYBUCKET/install_spark_kafka.sh
      
    • ストリーミング ワード カウントを開始します。

      $ gcloud dataproc jobs submit pyspark --cluster ad-kafka-init ./kafka_wordcount.py ad-kafka-inst:2181 test
      
于 2016-03-01T19:38:24.117 に答える