1

Aws kinesis ストリーム コンシューマの負荷分散を実装しようとしていました

ドキュメントに従って、私は実装しようとしています

spring:
  cloud:
    stream:
      instanceIndex: 1
      instanceCount: 3
      bindings:
        RollUpInboundStream:
          group: my-consumer-group
          destination: my-kinesis-stream
          content-type: application/json

私は3 つのコンテナーを持っています。必要に応じて、既存のコンテナーを再起動せずに、新しいコンテナー (最大 6 つ)を立ち上げたいと考えています。

  1. instanceIndex は 0 または 1 から始まります。
  2. instanceCount に 6 を指定して 3 つのインスタンスのみを起動すると、新しいインスタンスを起動するまですべてのメッセージが消費されます。
  3. ドキュメントに spring.cloud.stream.bindings..consumer.concurrency というプロパティがありますが、その重要性について教えてください。
  4. 何らかの理由で、いずれかのインスタンスがダウンした場合、いずれかのメッセージが消費されなくなります。

私たちを助けてください

4

1 に答える 1

1

これspring.cloud.stream.bindings..consumer.concurrencyは、消費者ごとの内部オプションです。

adapter.setConcurrency(properties.getConcurrency());

...

/**
 * The maximum number of concurrent {@link ConsumerInvoker}s running.
 * The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
 * Messages from within the same shard will be processed sequentially.
 * In other words each shard is tied with the particular thread.
 * By default the concurrency is unlimited and shard
 * is processed in the {@link #consumerExecutor} directly.
 * @param concurrency the concurrency maximum number
 */
public void setConcurrency(int concurrency) {

したがって、これは分散ソリューションでは何もしません。

instanceIndexinstanceCountはバインダーで次のように機能します。

    if (properties.getInstanceCount() > 1) {
        shardOffsets = new HashSet<>();
        KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
        List<Shard> shards = kinesisConsumerDestination.getShards();
        for (int i = 0; i < shards.size(); i++) {
            // divide shards across instances
            if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
                KinesisShardOffset shardOffset = new KinesisShardOffset(
                        kinesisShardOffset);
                shardOffset.setStream(destination.getName());
                shardOffset.setShard(shards.get(i).getShardId());
                shardOffsets.add(shardOffset);
            }
        }
    }

したがって、すべてのコンシューマーはストリーム内のシャードのサブセットを取得します。したがって、インスタンスよりも多くのシャードがある場合、一部のシャードが消費されないという事実が発生する可能性があります。

同じシャードから同時にメッセージを消費するものはありません。クラスターごとに 1 つのシャードを消費できるスレッドは 1 つだけです。

于 2019-04-01T17:15:41.207 に答える