Spark world と Job Server は初めてです
私のコード:
package spark.jobserver
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
import scala.collection.immutable.Map
import org.apache.cassandra.hadoop.ConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
import org.apache.cassandra.utils.ByteBufferUtil
import org.apache.hadoop.mapreduce.Job
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try
object CassandraCQLTest extends SparkJob{
def main(args: Array[String]) {
val sc = new SparkContext("local[4]", "CassandraCQLTest")
sc.addJar("/extra_data/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.3.0-SNAPSHOT.jar");
val config = ConfigFactory.parseString("")
val results = runJob(sc, config)
println("Result is " + "test")
}
override def validate(sc: SparkContext, config: Config): SparkJobValidation = {
Try(config.getString("input.string"))
.map(x => SparkJobValid)
.getOrElse(SparkJobInvalid("No input.string config param"))
}
override def runJob(sc: SparkContext, config: Config): Any = {
val cHost: String = "localhost"
val cPort: String = "9160"
val KeySpace = "retail"
val InputColumnFamily = "ordercf"
val OutputColumnFamily = "salecount"
val job = new Job()
job.setInputFormatClass(classOf[CqlPagingInputFormat])
ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily)
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3")
/** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'") */
/** An UPDATE writes one or more columns to a record in a Cassandra column family */
val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? "
CqlConfigHelper.setOutputCql(job.getConfiguration(), query)
job.setOutputFormatClass(classOf[CqlOutputFormat])
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily)
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost)
ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort)
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
classOf[CqlPagingInputFormat],
classOf[java.util.Map[String,ByteBuffer]],
classOf[java.util.Map[String,ByteBuffer]])
val productSaleRDD = casRdd.map {
case (key, value) => {
(ByteBufferUtil.string(value.get("prod_id")), ByteBufferUtil.toInt(value.get("quantity")))
}
}
val aggregatedRDD = productSaleRDD.reduceByKey(_ + _)
aggregatedRDD.collect().foreach {
case (productId, saleCount) => println(productId + ":" + saleCount)
}
val casoutputCF = aggregatedRDD.map {
case (productId, saleCount) => {
val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId))
val outKey: java.util.Map[String, ByteBuffer] = outColFamKey
var outColFamVal = new ListBuffer[ByteBuffer]
outColFamVal += ByteBufferUtil.bytes(saleCount)
val outVal: java.util.List[ByteBuffer] = outColFamVal
(outKey, outVal)
}
}
casoutputCF.saveAsNewAPIHadoopFile(
KeySpace,
classOf[java.util.Map[String, ByteBuffer]],
classOf[java.util.List[ByteBuffer]],
classOf[CqlOutputFormat],
job.getConfiguration()
)
casRdd.count
}
}
spark-jobServer を使用して Jar をプッシュして実行すると、spark-jobserver ターミナルでこれを取得します
job-server[ERROR] Exception in thread "pool-1-thread-1" java.lang.NoClassDefFoundError: org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat
job-server[ERROR] at spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:46)
job-server[ERROR] at spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:21)
job-server[ERROR] at spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:235)
job-server[ERROR] at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
job-server[ERROR] at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
job-server[ERROR] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
job-server[ERROR] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
job-server[ERROR] at java.lang.Thread.run(Thread.java:745)
job-server[ERROR] Caused by: java.lang.ClassNotFoundException: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
job-server[ERROR] at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
job-server[ERROR] at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
job-server[ERROR] at java.security.AccessController.doPrivileged(Native Method)
job-server[ERROR] at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
job-server[ERROR] at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
job-server[ERROR] at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
job-server[ERROR] ... 8 more
$EXTRA_JAR 変数を cassandra-spark-connector-assembly に既に追加しています。