16

eclipse (maven conf を使用) を 2 つのワーカーで使用して以下のコードを実行しようとしており、それぞれに 2 つのコアがあるか、spark-submit でも試しました。

public class StreamingWorkCount implements Serializable {

    public static void main(String[] args) {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
        JavaStreamingContext jssc = new JavaStreamingContext(
                "spark://192.168.1.19:7077", "JavaWordCount",
                new Duration(1000));
        JavaDStream<String> trainingData = jssc.textFileStream(
                "/home/bdi-user/kaushal-drive/spark/data/training").cache();
        trainingData.foreach(new Function<JavaRDD<String>, Void>() {

            public Void call(JavaRDD<String> rdd) throws Exception {
                List<String> output = rdd.collect();
                System.out.println("Sentences Collected from files " + output);
                return null;
            }
        });

        trainingData.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

そしてそのコードのログ

15/01/22 21:57:13 INFO FileInputDStream: New files at time 1421944033000 ms:

15/01/22 21:57:13 INFO JobScheduler: Added jobs for time 1421944033000 ms
15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms
15/01/22 21:57:13 INFO SparkContext: Starting job: foreach at StreamingKMean.java:33
15/01/22 21:57:13 INFO DAGScheduler: Job 3 finished: foreach at StreamingKMean.java:33, took 0.000094 s
Sentences Collected from files []
-------------------------------------------
15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms
Time: 1421944033000 ms
-------------------------------------------15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms


15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms
15/01/22 21:57:13 INFO JobScheduler: Total delay: 0.028 s for time 1421944033000 ms (execution: 0.013 s)
15/01/22 21:57:13 INFO MappedRDD: Removing RDD 5 from persistence list
15/01/22 21:57:13 INFO BlockManager: Removing RDD 5
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()

問題は、ディレクトリにあるファイルからデータを取得していないことです。私を助けてください。

4

7 に答える 7

13

別のディレクトリで試してから、ジョブの実行中にこれらのファイルをそのディレクトリにコピーします。

于 2015-01-22T19:22:19.577 に答える
5

同じ問題がありました。これが私のコードです:

lines = jssc.textFileStream("file:///Users/projects/spark/test/data');

TextFileSTream は非常に機密性が高いです。私がやったことは:

1. Run Spark program
2. touch datafile
3. mv datafile datafile2
4. mv datafile2  /Users/projects/spark/test/data

そしてそれはそれをしました。

于 2015-10-09T05:31:46.233 に答える
2

スキームを追加する必要があると思います。つまりfile://hdfs://パスの前に追加します。


コメントの編集を元に戻す理由:実際にfile://hdfs://パスの「前」に追加する必要があるため、合計パスはfile:///tmp/file.txtまたはになりhdfs:///user/dataます。構成に NameNode が設定されていない場合、後者は である必要がありますhdfs://host:port/user/data

于 2015-01-23T07:51:30.430 に答える
0

textFileStreamフォルダ内のファイルが追加または更新されている場合にのみ、フォルダを監視できます。

ファイルを読みたいだけなら、代わりにSparkContext.textFile.

于 2016-11-29T03:57:02.107 に答える
0

JavaDoc は、関数が新しいファイルのみをストリーミングすることを示唆しています。

参照: https://spark.apache.org/docs/1.0.1/api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html#textFileStream(java.lang.String)

新しいファイルの Hadoop 互換ファイルシステムを監視し、それらをテキスト ファイルとして読み取る入力ストリームを作成します (キーを LongWritable として、値を Text として、入力形式を TextInputFormat として使用します)。ファイルは、同じファイル システム内の別の場所から「移動」して、監視対象のディレクトリに書き込む必要があります。で始まるファイル名。は無視されます。

于 2016-04-25T21:37:14.873 に答える