2

Spark world と Job Server は初めてです

私のコード:

package spark.jobserver 

import java.nio.ByteBuffer 

import scala.collection.JavaConversions._ 
import scala.collection.mutable.ListBuffer 
import scala.collection.immutable.Map 

import org.apache.cassandra.hadoop.ConfigHelper 
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat 
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper 
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat 
import org.apache.cassandra.utils.ByteBufferUtil 
import org.apache.hadoop.mapreduce.Job 

import com.typesafe.config.{Config, ConfigFactory} 
import org.apache.spark._ 
import org.apache.spark.SparkContext._ 
import scala.util.Try 

object CassandraCQLTest extends SparkJob{ 

  def main(args: Array[String]) {   
    val sc = new SparkContext("local[4]", "CassandraCQLTest") 
    sc.addJar("/extra_data/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.3.0-SNAPSHOT.jar"); 
    val config = ConfigFactory.parseString("") 
    val results = runJob(sc, config) 
    println("Result is " + "test") 
  } 

  override def validate(sc: SparkContext, config: Config): SparkJobValidation = { 
    Try(config.getString("input.string")) 
      .map(x => SparkJobValid) 
      .getOrElse(SparkJobInvalid("No input.string config param")) 
  } 

  override def runJob(sc: SparkContext, config: Config): Any = { 
    val cHost: String = "localhost" 
    val cPort: String = "9160" 
    val KeySpace = "retail" 
    val InputColumnFamily = "ordercf" 
    val OutputColumnFamily = "salecount" 

    val job = new Job() 
    job.setInputFormatClass(classOf[CqlPagingInputFormat]) 
    ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost) 
    ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort) 
    ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily) 
    ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner") 
    CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3") 

    /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'") */ 

    /** An UPDATE writes one or more columns to a record in a Cassandra column family */ 
    val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? " 
    CqlConfigHelper.setOutputCql(job.getConfiguration(), query) 

    job.setOutputFormatClass(classOf[CqlOutputFormat]) 
    ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily) 
    ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost) 
    ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort) 
    ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner") 

    val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), 
      classOf[CqlPagingInputFormat], 
      classOf[java.util.Map[String,ByteBuffer]], 
      classOf[java.util.Map[String,ByteBuffer]]) 


    val productSaleRDD = casRdd.map { 
      case (key, value) => { 
        (ByteBufferUtil.string(value.get("prod_id")), ByteBufferUtil.toInt(value.get("quantity"))) 
      } 
    } 
    val aggregatedRDD = productSaleRDD.reduceByKey(_ + _) 
    aggregatedRDD.collect().foreach { 
      case (productId, saleCount) => println(productId + ":" + saleCount) 
    } 

    val casoutputCF  = aggregatedRDD.map { 
      case (productId, saleCount) => { 
        val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId)) 
        val outKey: java.util.Map[String, ByteBuffer] = outColFamKey 
        var outColFamVal = new ListBuffer[ByteBuffer] 
        outColFamVal += ByteBufferUtil.bytes(saleCount) 
        val outVal: java.util.List[ByteBuffer] = outColFamVal 
       (outKey, outVal) 
      } 
    } 

    casoutputCF.saveAsNewAPIHadoopFile( 
        KeySpace, 
        classOf[java.util.Map[String, ByteBuffer]], 
        classOf[java.util.List[ByteBuffer]], 
        classOf[CqlOutputFormat], 
        job.getConfiguration() 
      ) 
    casRdd.count 
  } 
} 

spark-jobServer を使用して Jar をプッシュして実行すると、spark-jobserver ターミナルでこれを取得します

job-server[ERROR] Exception in thread "pool-1-thread-1" java.lang.NoClassDefFoundError: org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat 
job-server[ERROR] at spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:46) 
job-server[ERROR] at spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:21) 
job-server[ERROR] at spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:235) 
job-server[ERROR] at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
job-server[ERROR] at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
job-server[ERROR] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
job-server[ERROR] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
job-server[ERROR] at java.lang.Thread.run(Thread.java:745) 
job-server[ERROR] Caused by: java.lang.ClassNotFoundException: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat 
job-server[ERROR] at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
job-server[ERROR] at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
job-server[ERROR] at java.security.AccessController.doPrivileged(Native Method) 
job-server[ERROR] at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
job-server[ERROR] at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
job-server[ERROR] at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 
job-server[ERROR] ... 8 more 

$EXTRA_JAR 変数を cassandra-spark-connector-assembly に既に追加しています。

4

2 に答える 2

0

CqlPagingInputFormat は cassandra-all バージョン 2.0.4 にあり、それ以降のバージョンにはありません。実行時に、アプリケーションは 2.0.4 以上の cassandra バージョンを使用しています。この依存関係を ur pom に追加する必要があります。

<dependency>
  <groupId>org.apache.cassandra</groupId>
  <artifactId>cassandra-all</artifactId>   
  <version>2.0.4</version>

このクラスを取得します。

しかし、他のことがうまくいくとは限りません。

于 2016-04-26T14:14:15.957 に答える
0

プログラムを Spark に送信するときは、依存するすべての jar ファイル (カンマ区切りのリスト) を含める必要があります。プロジェクトの構造が次のようになっているとします。

 simpleapp
  - src/main/java
    - org.apache.spark.examples
      - SimpleApp.java
  - lib
    - dependent.jars (you can put all dependent jars inside lib   directory)
  - target
    - simpleapp.jar (after compiling your source)

したがって、以下のコマンドを使用できます。

spark-submit --jars $(echo lib/*.jar | tr ' ' ',' ) --class org.apache.spark.examples.SimpleApp --master local[2]  target/simpleapp.jar

さらに、spark Web コンソールを使用して jar の配布を確認できます。プログラム -> 環境に移動し、spark によって不平を言われた jar ファイルが既に存在するかどうかを確認します。

于 2015-04-25T01:25:39.120 に答える