と の両方を使用してKPL
、KCL
サービス間でデータを交換するようになりました。ただしconsumer service
、 がオフラインになると、 から送信されたすべてのデータKPL
が永久に失われます。そのため、稼働中に送信されたデータのチャンクのみを取得consumer service
し、shardConsumer
準備ができています。最後に消費されたポイントから開始するか、何らかの形で残ったデータを処理する必要があります。
これが私のShardProcessor
コードです:
@Override
public void initialize(InitializationInput initializationInput) {
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
processRecordsInput.records()
.forEach(record -> {
//my logic
});
}
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
try {
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
LOG.error("Kinesis error on Shard Ended", e);
}
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
try {
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
LOG.error("Kinesis error on Shutdown Requested", e);
}
}
構成コード:
public void configure(String streamName, ShardRecordProcessorFactory factory) {
Region region = Region.of(awsRegion);
KinesisAsyncClient kinesisAsyncClient =
KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
ConfigsBuilder configsBuilder =
new ConfigsBuilder(streamName, appName, kinesisAsyncClient, dynamoClient, cloudWatchClient,
UUID.randomUUID().toString(), factory);
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
);
Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
}