1

JOIN セマンティクスを実装したいので、Trident トポロジで join メソッドを試します。join がバッチ間で行われていることがわかりました。数百万のタプルを持つ 2 つのストリーム間の結合の場合、1 つのバッチ内で行う必要がありますか?

genderSpout では、各バッチに 3 つのタプルがあるため、Spout は ageSpout の 2 つのバッチを発行し、各バッチには 5 つのタプルがあるため、Spout は 1 つのバッチのみを発行します。

そして、JoinType で LEFT OUTER JOIN を実行します。

テストコードの出力は次のとおりです。

1 man 15
2 woman 18
1 man 19
3 woman NULL
4 man NULL
1 woman NULL

出力から、最初の 4 つの結果は、genderSpout からの最初のバッチと ageSpout からの最初のバッチを結合していることがわかります。そして最後の 2 つの結果は、genderSpout からの 2 番目のバッチと ageSpout からの空のバッチとの間の結合です。したがって、genderSpout LEFT JOIN ageSpout の私の望ましい結果は次のとおりであるため、結果は JOIN セマンティクスに対して正しくありません。

1 man 15
1 man 19
2 woman 18
3 woman NULL
4 man 20
1 woman 15
1 woman 19

私の質問は、JOIN の両側 (Spout) に何百万ものタプルがある場合、正しい結果を得るためにそれらを 1 つのバッチに入れる必要がありますか?

または、私のやり方が間違っています。OUTER JOIN セマンティクスの正しい結果を得るにはどうすればよいか教えてください。

テストコードは次のとおりです。

public static void main(String[] args) throws Exception{
    Fields genderField = new Field("id", "gender");
    FixedBatchSpout genderSpout = new FixedBatchSpout(genderField, 3,
        new Values("1", "man"),
        new Values("2", "woman"),
        new Values("3", "woman"),
        new Values("4", "man"),
        new Values("1", "woman"));
    genderSpout.setCycle(false);

    Fields ageField = new Field("id2", "age");
    FiexedBatchSpout ageSpout = new FixedBatchSpout(new Fields("id2", "age"), 5,
        new Values("1", "15"),
        new Values("4", "20"),
        new Values("2", "18"),
        new Values("1", "19"));
    ageSpout.setCycle(false);

    List<Stream> allStreams = new ArrayList<Stream>();
    List<Fields> allFields = new ArrayList<Fields>();
    List<Fields> joinFileds = new ArrayList<Fields>();
    List<JoinType> joinTypes = new ArrayList<JoinType>();    

    TridentTopology topology = new TridentTopology();

    Stream genderStream = topology.newStream("genderIn", genderSpout);
    Stream ageStream = topology.newStream("ageIn", ageSpout);

    allStreams.add(genderStream);
    allStreams.add(ageStream);

    allFields.add(genderFields);
    allFields.add(ageFields);

    joinFields.add(new Field("id")));
    joinFields.add(new Field("id2"));

    joinTypes.add(JoinType.INNER);
    joinTypes.add(JoinType.OUTER);

    topology.join(allStreams, joinFields, new Filds("id", "gender", "age"), joinTypes)

    LocalCluster cluster = new LocalCluster();

    Config config = new Config()
    config.setDebug(false);
    config.setMaxSpoutPending(3);

    cluster.submitTopology("trident-join-test", config, topology.build());

    Thread.sleep(3000);
    cluster.shutdown();
}
4

1 に答える 1