これspring.cloud.stream.bindings..consumer.concurrency
は、消費者ごとの内部オプションです。
adapter.setConcurrency(properties.getConcurrency());
...
/**
* The maximum number of concurrent {@link ConsumerInvoker}s running.
* The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
* Messages from within the same shard will be processed sequentially.
* In other words each shard is tied with the particular thread.
* By default the concurrency is unlimited and shard
* is processed in the {@link #consumerExecutor} directly.
* @param concurrency the concurrency maximum number
*/
public void setConcurrency(int concurrency) {
したがって、これは分散ソリューションでは何もしません。
instanceIndex
とinstanceCount
はバインダーで次のように機能します。
if (properties.getInstanceCount() > 1) {
shardOffsets = new HashSet<>();
KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
List<Shard> shards = kinesisConsumerDestination.getShards();
for (int i = 0; i < shards.size(); i++) {
// divide shards across instances
if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
KinesisShardOffset shardOffset = new KinesisShardOffset(
kinesisShardOffset);
shardOffset.setStream(destination.getName());
shardOffset.setShard(shards.get(i).getShardId());
shardOffsets.add(shardOffset);
}
}
}
したがって、すべてのコンシューマーはストリーム内のシャードのサブセットを取得します。したがって、インスタンスよりも多くのシャードがある場合、一部のシャードが消費されないという事実が発生する可能性があります。
同じシャードから同時にメッセージを消費するものはありません。クラスターごとに 1 つのシャードを消費できるスレッドは 1 つだけです。