0

ローカル マシンで動作する単純な Spark-Streaming の例を取得しようとしています。
As/Bs/Cs をソケットに書き込むスレッドがあります。

serverSocket = new ServerSocket(Constants.PORT);
s1 = serverSocket.accept();
while(true) {
    Thread.sleep(random.nextInt(100));
    String character = alphabet.get(random.nextInt(alphabet.size())) ;
    PrintWriter out = new PrintWriter(s1.getOutputStream());
    out.println(character);
    out.flush();
}

As/Bs/Cs の数を数えようとする私のメイン プログラムは次のようになります (reduce ステップなし):

public static void main(String[] args) {
    // start socket writer thread
    System.setProperty("spark.cleaner.ttl", "10000");
    JavaSparkContext sc = new JavaSparkContext(
            "local", 
            "Test",
            Constants.SPARK_HOME, 
            new String[]{"target/spark-standalone-0.0.1-SNAPSHOT.jar"});
    Duration batchDuration = new Duration(TIME_WINDOW_MS);
    JavaStreamingContext streamingContext = new JavaStreamingContext(sc, batchDuration);
    JavaDStream<String> stream = streamingContext.socketTextStream("localhost", Constants.PORT);
    stream.print();
    JavaPairDStream<String, Long> texts = stream.map(new PairFunction<String, String, Long>() {

            @Override
            public Tuple2<String, Long> call(String t) throws Exception {
                return new Tuple2<String, Long>("batchCount" + t, 1l);
            }

        });
     texts.print();
     streamingContext.checkpoint("checkPointDir");
     streamingContext.start();

この場合、すべて正常に動作します (バッチのサンプル出力):

Time: 1372413296000 ms
-------------------------------------------
B
A
B
C
C
C
A
B
C
C
...

-------------------------------------------
Time: 1372413296000 ms
-------------------------------------------
(batchCountB,1)
(batchCountA,1)
(batchCountB,1)
(batchCountC,1)
(batchCountC,1)
(batchCountC,1)
(batchCountA,1)
(batchCountB,1)
(batchCountC,1)
(batchCountC,1)
...

しかし、マップの後に削減ステップを追加すると、機能しなくなります。このコードは texts.print() の後に続きます

JavaPairDStream<String, Long> reduced = texts.reduceByKeyAndWindow(new Function2<Long, Long, Long>() {

    @Override
    public Long call(Long t1, Long t2) throws Exception {
        return t1 + t2;
    }
    }, new Duration(TIME_WINDOW_MS));
reduced.print();

この場合、最初の「ストリーム」変数と「テキスト」変数の出力のみを取得し、reduce の出力は取得しません。また、この最初のバッチ処理の後は何も起こりません。また、spark ログ レベルを DEBUG に設定しましたが、例外やその他の奇妙なことは発生しませんでした。

そこで何が起こるの?ロックされるのはなぜですか?

4

1 に答える 1

2

記録のために: Spark ユーザーグループで回答を得ました。
エラーは、使用する必要があることです

"local[2]"

それ以外の

"local"

同時処理を有効にするために、Spark コンテキストをインスタンス化するパラメーターとして。

于 2013-07-01T08:03:42.740 に答える