27

次のように、ソースとして Twitter を使用して Spark Streaming の例を実行しようとしています。

public static void main (String.. args) {

    SparkConf conf = new SparkConf().setAppName("Spark_Streaming_Twitter").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);       
        JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(2));      
        JavaSQLContext sqlCtx = new JavaSQLContext(sc);     


        String[] filters = new String[] {"soccer"};

        JavaReceiverInputDStream<Status> receiverStream = TwitterUtils.createStream(jssc,filters);



         jssc.start();
         jssc.awaitTermination();

}

しかし、私は次の例外を受けています

Exception in thread "main" java.lang.AssertionError: assertion failed: No output streams registered, so nothing to execute
    at scala.Predef$.assert(Predef.scala:179)
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:158)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:416)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:437)
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:501)
    at org.learning.spark.TwitterStreamSpark.main(TwitterStreamSpark.java:53)

この問題を解決する方法について何か提案はありますか?

4

3 に答える 3

45

出力演算子が呼び出されると、ストリームの計算がトリガーされます。

DStream に出力演算子がない場合、計算は呼び出されません。基本的に、ストリームで以下のメソッドのいずれかを呼び出す必要があります

print()
foreachRDD(func)
saveAsObjectFiles(prefix, [suffix])
saveAsTextFiles(prefix, [suffix])
saveAsHadoopFiles(prefix, [suffix])

http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations

最初に変換を適用してから、必要に応じて関数を出力することもできます。

于 2014-07-04T10:24:28.733 に答える
3

スレッド「メイン」の例外 java.lang.AssertionError: アサーションに失敗しました: 出力ストリームが登録されていないため、何も実行できません

TL;DR、または(またはあまり使用されない or )などの利用可能な出力演算子のいずれかを使用します。printsaveAsTextFilesforeachRDDsaveAsObjectFilessaveAsHadoopFiles

つまり、コード内の次の行の間で出力演算子を使用する必要があります。

JavaReceiverInputDStream<Status> receiverStream = TwitterUtils.createStream(jssc,filters);
// --> The output operator here <--
jssc.start();

DStreams での Spark の公式ドキュメントの出力操作を引用します(強調表示):

出力操作により、DStream のデータをデータベースやファイル システムなどの外部システムにプッシュすることができます。出力操作では、変換されたデータを外部システムが実際に使用できるようになるため、すべての DStream 変換の実際の実行がトリガーされます (RDD のアクションと同様)

ポイントは、出力演算子がないと、「出力ストリームが登録されていないため、実行するものが何もない」ということです。

あるコメンターが気づいたように、出力変換を使用する必要がprintありforeachRDDますStreamingContext


内部的には、使用可能な出力演算子の 1 つを使用するたびに、またはprint出力ストリームを追加するように要求されます。foreachDStreamGraph

新しい ForEachDStream が作成され、後で登録されたときに登録を見つけることができます(これは、正確にそれを出力ストリームとして追加することです)。

于 2017-12-25T21:01:41.817 に答える
1

また、誤って、この問題を非難することはできませんが、実際の原因は、ストリーミング入力からのスライド ウィンドウ期間と RDD 時間ウィンドウの間の 非倍数です。警告のみをログに記録します。修正すると、コンテキストが失敗しなくなります:D

于 2016-07-14T16:47:24.913 に答える