4

標準のスパウトとボルトの組み合わせを使用してストリーミング集約を行うことができ、ティックタプルを使用して一定の間隔でデータを永続化してバッチ処理を利用する場合、満足のいくケースで非常にうまく機能します。現在、私はいくつかの障害管理(保存されていないタプルの追跡など)を自分で行っています(つまり、嵐からのootbではありません)

しかし、トライデントがより高い抽象化とより優れた障害管理を提供することを読んだことがあります。私が理解していないのは、トライデントにティックタプルのサポートがあるかどうかです。基本的に、現在の分程度のメモリにバッチ処理し、トライデントを使用して前の分の集計データを保持したいと考えています。

ここでの指針やデザインの提案は役に立ちます。

ありがとう

4

1 に答える 1

0

実際、マイクロバッチ処理は組み込みの Trident の機能です。そのためにティックタプルは必要ありません。コードに次のようなものがある場合:

topology
    .newStream("myStream", spout)
    .partitionPersist(
        ElasticSearchEventState.getFactoryFor(connectionProvider),
        new Fields("field1", "field2"),
        new ElasticSearchEventUpdater()
    )

(ここでは、カスタム ElasticSearch 状態/アップデーターを使用しています。他のものを使用することもできます)

したがって、このような場合、ボンネットの下で Trident はストリームをバッチにグループ化し、個々のタプルではなくそれらのバッチに対して partitionPersist 操作を実行します。

何らかの理由でティック タプルが必要な場合は、ティック スパウトを作成してください。

public class TickSpout implements IBatchSpout {

    public static final String TIMESTAMP_FIELD = "timestamp";
    private final long delay;

    public TickSpout(long delay) {
        this.delay = delay;
    }

    @Override
    public void open(Map conf, TopologyContext context) {
    }

    @Override
    public void emitBatch(long batchId, TridentCollector collector) {
        Utils.sleep(delay);
        collector.emit(new Values(System.currentTimeMillis()));
    }

    @Override
    public void ack(long batchId) {
    }

    @Override
    public void close() {
    }

    @Override
    public Map getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields(TIMESTAMP_FIELD);
    }
}
于 2015-12-15T22:32:23.050 に答える