7

私はトライデントフレームワークを学んでいます。Trident には、インターフェイスを使用してタプルのステートフル マッピングを実行できるこのメソッドを含む、バッチ内の集計タプル用Streamメソッドがいくつかあります。しかし残念なことに、他の 9 つの のオーバーロードのように、引数としてのみ を使用して、マップの状態をさらに永続化するための組み込みの対応物は存在しません。AggregatorpersistentAggregate()Aggregator

では、下位レベルの Trident と Storm の抽象化とツールを組み合わせて、目的の機能を実装するにはどうすればよいでしょうか? Javadoc ドキュメントがほとんどないため、API を調べるのはかなり困難です。

つまり、persistentAggregate()メソッドを使用すると、永続的な状態を更新してストリーム処理を終了できます。

stream of tuples ---> persistent state

ちなみに、永続的な状態を更新し、さまざまなタプルを発行したい:

stream of tuples ------> stream of different tuples
                  with
            persistent state

Stream.aggregate(Fields, Aggregator, Fields)耐障害性を提供​​しません:

stream of tuples ------> stream of different tuples
                  with
          simple in-memory state
4

1 に答える 1

3

TridentState#newValuesStream()メソッドを使用して、状態から新しいストリームを作成できます。これにより、集計値のストリームを取得できます。

説明のために、このメソッドと Debug Filter を追加することで、Trident ドキュメントの例を改善できます。

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
    new Values("the cow jumped over the moon"),
    new Values("the man went to the store and bought some candy"),
    new Values("four score and seven years ago"),
    new Values("how many apples can you eat"));
spout.setCycle(true);

TridentTopology topology = new TridentTopology();        
topology.newStream("spout1", spout)
    .each(new Fields("sentence"), new Split(), new Fields("word"))
    .groupBy(new Fields("word"))
    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
    .newValuesStream().each(new Fields("count"), new Debug());

このトポロジを実行すると、集計されたカウントが (コンソールに) 出力されます。

それが役に立てば幸い

于 2013-11-27T10:29:55.840 に答える