1

最近、データパーサーをJavaのストリームに再実装しようとしていますが、特定のことを行う方法がわかりません:

timeStamp を持つオブジェクト A を考えてみましょう。さまざまな A オブジェクトで構成されているオブジェクト B を考えてみましょう。オブジェクト B の時間範囲を示すいくつかのメトリックを考えてみましょう。

私が今持っているのは、オブジェクトAのリストを通過する状態を持つメソッドであり、最後のオブジェクトBに収まる場合はそこに移動し、そうでない場合は新しいBインスタンスを作成し、そこにオブジェクトAを配置し始めます。

これをストリームの方法で行いたい

オブジェクト A の全リストを取り、それをストリームにします。ここで、「チャンク」を作成し、それらをオブジェクト B に蓄積する関数を理解する必要があります。どうすればよいですか?
ありがとう

編集:

A と B は複雑ですが、簡略化したバージョンをここに投稿しようと思います。

class A {
    private final long time;
    private A(long time) {
        this.time = time;
    }
    long getTime() {
        return time;
    }
}

class B {
     // not important, build from "full" temporaryB class
     // result of accumulation         
}

class TemporaryB {
    private final long startingTime;
    private int counter;

    public TemporaryB(A a) {
        this.startingTime = a.getTime();
    }

    boolean fits(A a) {
        return a.getTime() - startingTime < THRESHOLD;
    }

    void add(A a) {
        counter++;
    }
}

class Accumulator {
    private List<B> accumulatedB;
    private TemporaryBParameters temporaryBParameters
    public void addA(A a) {
         if(temporaryBParameters.fits(a)) {
             temporaryBParameters.add(a)
         } else {
             accumulateB.add(new B(temporaryBParameters)
             temporaryBParameters = new TemporaryBParameters(a)
         }
    } 
}

わかりましたので、これは非常に単純化された方法であり、今これを行う方法です。私はそれが気に入りません。それは醜いです。

4

1 に答える 1

0

一般に、このような問題は Stream API にはあまり適していません。ローカルでない知識が必要になると、並列処理が難しくなるからです。new A(1)new A(2)などをに設定しnew A(3)て まで持っていると想像してください。したがって、基本的には、入力を 10 個の要素でバッチに結合する必要があります。ここで、この回答で説明したのと同じ問題があります。タスクをサブタスクに分割すると、サフィックス部分はプレフィックス部分に含まれる要素の数を正確に認識できない可能性があるため、プレフィックス全体が処理されるまで、データをバッチに結合することさえ開始できません。 . あなたの問題は本質的にシリアルです。new A(1000)Threshold10

一方、私のStreamExライブラリには新しいheadTailメソッドによる解決策があります。この方法は並列化がうまくいきませんが、ほとんどすべての操作をわずか数行で定義できます。

で問題を解決する方法は次のheadTailとおりです。

static StreamEx<TemporaryB> combine(StreamEx<A> input, TemporaryB tb) {
    return input.headTail((head, tail) ->
        tb == null ? combine(tail, new TemporaryB(head)) :
            tb.fits(head) ? combine(tail, tb.add(head)) :
                combine(tail, new TemporaryB(head)).prepend(tb), 
        () -> StreamEx.ofNullable(tb));
}

ここで私はあなたのTemporaryB方法をこのように修正しました:

TemporaryB add(A a) {
    counter++;
    return this;
}

サンプル ( Threshold= 1000 と仮定):

List<A> input = Arrays.asList(new A(1), new A(10), new A(1000), new A(1001), new A(
        1002), new A(1003), new A(2000), new A(2002), new A(2003), new A(2004));

Stream<B> streamOfB = combine(StreamEx.of(input), null).map(B::new);
streamOfB.forEach(System.out::println);

出力 (シンプルに書きましたB.toString()):

B [counter=2, startingTime=1]
B [counter=3, startingTime=1001]
B [counter=2, startingTime=2002]

したがって、ここでは実際に の遅延StreamがありBます。


説明:

StreamEx.headTailパラメータは 2 つのラムダです。First は、入力ストリームが空でない場合に最大 1 回呼び出されます。最初のストリーム要素 (head) と、他のすべての要素を含むストリーム (tail) を受け取ります。2 番目は、入力ストリームが空でパラメーターを受け取らない場合に、最大で 1 回呼び出されます。両方とも、代わりに使用される出力ストリームを生成する必要があります。ここにあるのは次のとおりです。

return input.headTail((head, tail) ->

tb == nullが開始ケースで、TemporaryBから new を作成し、headで self を呼び出しますtail

    tb == null ? combine(tail, new TemporaryB(head)) :

tb.fits(head)? headわかりました、を既存のものに追加し、tbself を次のように呼び出しますtail

        tb.fits(head) ? combine(tail, tb.add(head)) :

それ以外の場合は、再度 new を作成しますTemporaryB(head)が、出力ストリームの前に現在のtb要素を追加します (実際には、新しい要素をターゲット ストリームに放出します)。

            combine(tail, new TemporaryB(head)).prepend(tb), 

入力ストリームが使い果たされていますか? OK、tbもしあれば最後に収集されたものを返します:

    () -> StreamEx.ofNullable(tb));

headTail実装は、再帰的に見える一方で、そのようなソリューションがスタックとヒープを一定量以上消費しないことを保証することに注意してください。疑問がある場合は、何千もの入力要素で確認できます。

Stream<B> streamOfB = combine(LongStreamEx.range(100000).mapToObj(A::new), null).map(B::new);
streamOfB.forEach(System.out::println);
于 2016-01-22T06:57:55.340 に答える