0

と の両方を使用してKPLKCLサービス間でデータを交換するようになりました。ただしconsumer service、 がオフラインになると、 から送信されたすべてのデータKPLが永久に失われます。そのため、稼働中に送信されたデータのチャンクのみを取得consumer serviceし、shardConsumer準備ができています。最後に消費されたポイントから開始するか、何らかの形で残ったデータを処理する必要があります。

これが私のShardProcessorコードです:

@Override
    public void initialize(InitializationInput initializationInput) {

    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        processRecordsInput.records()
                .forEach(record -> {
                    //my logic
                });
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {

    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            LOG.error("Kinesis error on Shard Ended", e);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            LOG.error("Kinesis error on Shutdown Requested", e);

        }
    }

構成コード:

public void configure(String streamName, ShardRecordProcessorFactory factory) {

        Region region = Region.of(awsRegion);

        KinesisAsyncClient kinesisAsyncClient =
                KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder =
                new ConfigsBuilder(streamName, appName, kinesisAsyncClient, dynamoClient, cloudWatchClient,
                        UUID.randomUUID().toString(), factory);

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
                        .retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }
4

1 に答える 1