そのため、Java と spark cassandra コネクタを使用して動作する簡単なサンプル プログラムを取得しようとしています。sbt アセンブリの実行は正常に動作し、spark に送信するファット jar を取得します。ここで問題が発生します。ジョブをスパークに送信すると、次のエラーが発生します。
vagrant@cassandra-spark:~$ source submit-job.sh
Exception in thread "main" java.lang.NoClassDefFoundError: com/datastax/spark/connector/japi/CassandraJavaUtil
at JavaTest.main(JavaTest.java:13)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.japi.CassandraJavaUtil
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 10 more
これは、以下の submit-job.sh スクリプトです。
#!/usr/bin/env bash
~/spark/bin/spark-submit --driver-class-path ~/JavaTest/lib/spark-cassandra-connector-assembly-1.3.0-M2-SNAPSHOT.jar ~/JavaTest/target/scala-2.10/CassSparkTest-assembly-1.0.jar
これは私のbuild.sbtファイルです
lazy val root = (project in file(".")).
settings(
name := "CassSparkTest",
version := "1.0"
)
libraryDependencies ++= Seq(
"com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5" % "provided",
"org.apache.cassandra" % "cassandra-thrift" % "2.1.5" % "provided",
"org.apache.cassandra" % "cassandra-clientutil" % "2.1.5" % "provided",
//"com.datastax.spark" %% "spark-cassandra-connector" % "1.3.0-M1" % "provided",
"com.datastax.spark" %% "spark-cassandra-connector-java" % "1.3.0-M1" % "provided",
"org.apache.spark" %% "spark-core" % "1.3.1" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.3.1" % "provided",
"org.apache.spark" %% "spark-sql" % "1.3.1" % "provided",
"org.apache.commons" % "commons-lang3" % "3.4" % "provided"
)
以下はコンパイル中のコードです。
import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;
import com.datastax.spark.connector.japi.CassandraRow;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.commons.lang3.StringUtils;
public class JavaTest {
public static void main(String[] args) {
SparkConf conf = new SparkConf().set("spark.cassandra.connection.host", "127.0.0.1");
JavaSparkContext sc = new JavaSparkContext("spark://192.168.10.11:7077", "test", conf);
JavaRDD<String> cassandraRowsRDD = javaFunctions(sc).cassandraTable("ks", "test")
.map(new Function<CassandraRow, String>() {
@Override
public String call(CassandraRow cassandraRow) throws Exception {
return cassandraRow.toString();
}
});
System.out.println("Data as CassandraRows: \n" + StringUtils.join(cassandraRowsRDD.toArray(), "\n"));
}
}
sbt アセンブリは正常に動作しますが、ジョブが実際に送信されるとクラス定義が見つかりません。