実際、マイクロバッチ処理は組み込みの Trident の機能です。そのためにティックタプルは必要ありません。コードに次のようなものがある場合:
topology
.newStream("myStream", spout)
.partitionPersist(
ElasticSearchEventState.getFactoryFor(connectionProvider),
new Fields("field1", "field2"),
new ElasticSearchEventUpdater()
)
(ここでは、カスタム ElasticSearch 状態/アップデーターを使用しています。他のものを使用することもできます)
したがって、このような場合、ボンネットの下で Trident はストリームをバッチにグループ化し、個々のタプルではなくそれらのバッチに対して partitionPersist 操作を実行します。
何らかの理由でティック タプルが必要な場合は、ティック スパウトを作成してください。
public class TickSpout implements IBatchSpout {
public static final String TIMESTAMP_FIELD = "timestamp";
private final long delay;
public TickSpout(long delay) {
this.delay = delay;
}
@Override
public void open(Map conf, TopologyContext context) {
}
@Override
public void emitBatch(long batchId, TridentCollector collector) {
Utils.sleep(delay);
collector.emit(new Values(System.currentTimeMillis()));
}
@Override
public void ack(long batchId) {
}
@Override
public void close() {
}
@Override
public Map getComponentConfiguration() {
return null;
}
@Override
public Fields getOutputFields() {
return new Fields(TIMESTAMP_FIELD);
}
}