1

AWS Kinesis からデータを読み取る単純なアプリケーションを構築しようとしています。単一のシャードを使用してデータを読み取ることができましたが、4 つの異なるシャードからデータを取得したいと考えています。

問題は、シャードがアクティブである限り反復する while ループがあるため、別のシャードからデータを読み取ることができないことです。これまでのところ、代替アルゴリズムを見つけることも、KCL ベースのソリューションを実装することもできませんでした。よろしくお願いします

public static void DoSomething() {
        AmazonKinesisClient client = new AmazonKinesisClient();
        //noinspection deprecation
        client.setEndpoint(endpoint, serviceName, regionId);  
        /** get shards from the stream using describe stream method*/

        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
        describeStreamRequest.setStreamName(streamName);
        List<Shard> shards = new ArrayList<>();
        String exclusiveStartShardId = null;
        do {
            describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
            DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest);
            shards.addAll(describeStreamResult.getStreamDescription().getShards());
            if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
                exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
            } else {
                exclusiveStartShardId = null;
            }
        }while (exclusiveStartShardId != null);

        /** shards obtained */
        String shardIterator;

        GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
        getShardIteratorRequest.setStreamName(streamName);
        getShardIteratorRequest.setShardId(shards.get(0).getShardId());
        getShardIteratorRequest.setShardIteratorType("LATEST"); 

        GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
        shardIterator = getShardIteratorResult.getShardIterator();
        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();

        while (!shardIterator.equals(null)) {
            getRecordsRequest.setShardIterator(shardIterator);
            getRecordsRequest.setLimit(250);
            GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
            List<Record> records = getRecordsResult.getRecords();

            shardIterator = getRecordsResult.getNextShardIterator();
            if(records.size()!=0) {
                for(Record r : records) {
                    System.out.println(r.getPartitionKey());
                }
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {

            }
        }
    }
4

2 に答える 2