hdfsフォルダー(sparkstreamining)からストリーミングしようとしています:
SparkConf sparkConf = new SparkConf()
.setMaster("spark://quickstart.cloudera:7077")
.setAppName("BigData");
JavaStreamingContext ssc =
new JavaStreamingContext(sparkConf, new Duration(2000));
ssc.textFileStream("hdfs://user/cloudera/events/")
それが私がした良いフォルダかどうかを確認するために
hadoop fs -ls
JavaDStream にデータが含まれていないという戻り値でエラーが発生していません。フォルダにはすでにいくつかのファイルが含まれています..
フォルダパスを確認する他の方法はありますか? 他にチェックすべき点はありますか?
注:ローカルファイルから読み取ってストリームに入れようとしましたが、まだ空の入力があります(この試行のコードは次のとおりです)。PC上のファイルの場所をspark-shell ..input.print()行がjava.io.Exceptionを与えています...
System.out.println( "Create context(es)" );
SparkConf sparkConf = new SparkConf()
.setMaster("spark://quickstart.cloudera:7077")
.setAppName("BigData")
.setSparkHome("/usr/lib/spark")
.setJars(new String[]{"target/standard-to-self-explicit-0.0.1-SNAPSHOT.jar"});
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SparkConf sparkConf2 = new SparkConf()
.setMaster("spark://quickstart.cloudera:7077")
.setAppName("BigData2")
.setJars(new String[]{"target/standard-to-self-explicit-0.0.1-SNAPSHOT.jar"});
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf2, new Duration(2000));
System.out.println( "Set input file" );
JavaRDD<String> inputRDD1 = sc.textFile(inputFile+"1.json");
JavaRDD<String> inputRDD2 = sc.textFile(inputFile+"2.json");
Queue<JavaRDD<String>> inputRDDQueue = new LinkedList<JavaRDD<String>>();
inputRDDQueue.add(inputRDD1);
inputRDDQueue.add(inputRDD2);
System.out.println("debug 1: "+(String)inputRDD1.toDebugString());
//System.out.println("first 1: "+inputRDD1.take(3).toString());
JavaDStream<String> input = ssc.queueStream(inputRDDQueue);
input.print();
ssc.start();
ssc.awaitTermination();
また、この 2 番目のバージョンのログは、入力に問題があることを示しています。
RROR JobScheduler: Error running job streaming job 1416306522000 ms.1
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage 14.0 (TID 78, quickstart.cloudera): **java.io.IOException: unexpected exception type**
java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
ありがとうございました!