1

私は Spark Streaming の初心者で、単一の (K,V) ペアの例をたくさん見つけたので、この問題を処理する方法を見つけようとして行き詰まりました。Java で Spark の変換を使用する最善のアプローチを見つけるために、いくつかの助けをいただければ幸いです。

シナリオを簡単に説明すると、

目標は、時間枠内の一連の要素のエラー率を取得することです。

次の入力を考えると、

(A, Error)
(B, Success)
(B, Error)
(B, Success)
(C, Success)
(C, Error)

要素ごとに集計し、次に status で集計します(Element, (Number of Success, Number of Error))。この場合、変換の結果は次のようになります。

(A, (0,1))
(B, (2,1))
(C, (1,1))

そして最後に (i1,i2) -> i1/(i1+i2) のような関数を使った比率計算です。

(A, 100%)
(B, 33.3%)
(C, 50%)

私が理解している限り、結果はreduceByKeyAndWindow()関数によって与えられます。たとえば、

JavaPairDStream<String, Double> res = 
pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(1));

アプリケーションの逆の流れに従って、私の質問は、

複数の値またはキー (おそらく のようなものJavaPairDStream<String, Tuple2<Integer,Integer>>) を持つ JavaPairDStream でペアを定義する方法は?

reduceFunc複数のキーを持つ特定のペアに最適なアプローチはどれですか?

最初の DStream をマップする最良の方法はどれですか (おそらく のようなものJavaDStream<Tuple2<String, String>> line = input.map(func))?

よろしくお願いいたします。

4

1 に答える 1

2

私はすでに解決策を見つけました。関数クラスとタプルを操作すると、Scala で構築する任意の組み合わせを見つけることができます。問題は、Java でこれに関連するドキュメントや例が見つからなかったことです。以下に、将来誰かを助けることができる場合に備えて、私の解決策を示します。

JavaPairDStream<String,String> samples = lines.flatMapToPair(new PairFlatMapFunction<String,String, String>() {
            public Iterator<Tuple2<String,String>> call(String s) throws Exception {
                return Arrays.asList(new Tuple2<String, String>(//Some logic on my data//).iterator();
            }
        });


JavaPairDStream<Tuple2<String,String>, Integer> samplePairs = samples.mapToPair(
               new PairFunction<Tuple2<String,String>, Tuple2<String,String>, Integer>() {
                    public Tuple2<Tuple2<String,String>, Integer> call(Tuple2<String,String> t) {
                        return new Tuple2<Tuple2<String,String>, Integer>(t, 1);
                    }
                });

        JavaPairDStream<String, Integer> countErrors = samplePairs.filter(new Function<Tuple2<Tuple2<String,String>,Integer>,Boolean>() {
            public Boolean call(Tuple2<Tuple2<String,String>, Integer> t)
            {
               return (t._1._2.equals("Error"));
           }}).mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>, String, Integer>() {
            public Tuple2<String,Integer> call(Tuple2<Tuple2<String,String>,Integer> t) {
                return new Tuple2(t._1._1,t._2);
            }
        }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }}, Durations.seconds(30), Durations.seconds(1));

        JavaPairDStream<String, Integer> countSuccess= samplePairs.filter(new Function<Tuple2<Tuple2<String,String>,Integer>,Boolean>() {
            public Boolean call(Tuple2<Tuple2<String,String>, Integer> t)
            {
                return (t._1._2.equals("Success"));
            }}).mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>, String, Integer>() {
            public Tuple2<String,Integer> call(Tuple2<Tuple2<String,String>,Integer> t) {
                return new Tuple2(t._1._1,t._2);
            }
        }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }}, Durations.seconds(30), Durations.seconds(1));

        JavaPairDStream<String,Tuple2<Optional<Integer>,Optional<Integer>>> countPairs = countSuccess.fullOuterJoin(countErrors);

        JavaPairDStream<String, Double> mappedRDD = countPairs
                .mapToPair(new PairFunction<Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>>, String, Double>() {
                    public Tuple2<String, Double> call(Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>> stringTuple2Tuple2) throws Exception {
                        if ((stringTuple2Tuple2._2()._2().isPresent()) && (stringTuple2Tuple2._2()._1().isPresent())) {
                            return new Tuple2<String, Double>(stringTuple2Tuple2._1(), ((double)stringTuple2Tuple2._2()._1().get() /
                                    ((double)stringTuple2Tuple2._2()._2().get()+(double)stringTuple2Tuple2._2()._1().get())));
                        } else if (stringTuple2Tuple2._2()._2().isPresent()){
                            return new Tuple2<String, Double>(stringTuple2Tuple2._1(), 1.0);
                        } else {
                            return new Tuple2<String, Double>(stringTuple2Tuple2._1(), 0.0);
                        }
                    }
                });
于 2016-11-21T18:42:38.073 に答える