3

~30K メッセージ/秒を生成するいくつかの kafka トピックを見ています。これらの 1 つを読み取り、少し (5 秒のウィンドウ) を集約し、(最終的に) DB に書き込むための flink トポロジ設定があります。

トポロジーを実行し、読み取り -> 集約ステップ以外をすべて削除すると、 1 分あたり最大30K メッセージしか取得できません。バックプレッシャーが発生する場所はどこにもありません。

私は何を間違っていますか?


編集:

  1. トピックスペースについては何も変更できません。各トピックには 1 つのパーティションがあり、何百ものパーティションがあります。
  2. 各メッセージは、平均 2 ~ 3Kb の圧縮されたリサイクル オブジェクトです。

~1.5 MB/秒しか取得できないようです。v 言及された 100MB/s には近くありません。

現在のコード パス:

DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);  
DataStream<Tuple4<Long, Long, Integer, String>> ds4 = dataStream4.rebalance().flatMap(new mapper2("data_4")).setParallelism(4);

public class mapper2 implements FlatMapFunction<byte[], Tuple4<Long, Long, Integer, String>> {
    private String mapId;
    public mapper2(String mapId) {
        this.mapId = mapId;
    }

    @Override
    public void flatMap(byte[] bytes, Collector<Tuple4<Long, Long, Integer, String>> collector) throws Exception {
        TimeData timeData = (TimeData)ts_thriftDecoder.fromBytes(bytes);
        Tuple4 tuple4 = new Tuple4<Long, Long, Integer, String>();
        tuple4.f0 = timeData.getId();
        tuple4.f1 = timeData.getOtherId();
        tuple4.f2 = timeData.getSections().size();
        tuple4.f3 = mapId;

        collector.collect(tuple4);
    }
}
4

2 に答える 2

5

コードから、パフォーマンスの問題を引き起こす可能性のある 2 つの潜在的なコンポーネントがわかりました。

  • FlinkKafkaConsumer
  • Thrift デシリアライザー

ボトルネックがどこにあるかを理解するために、最初に Kafka トピックからの Flink 読み取りの生の読み取りパフォーマンスを測定します。

したがって、クラスターで次のコードを実行できますか?

public class RawKafka {

private static final Logger LOG = LoggerFactory.getLogger(RawKafka.class);

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);

    dataStream4.flatMap(new FlatMapFunction<byte[], Integer>() {
        long received = 0;
        long logfreq = 50000;
        long lastLog = -1;
        long lastElements = 0;

        @Override
        public void flatMap(byte[] element, Collector<Integer> collector) throws Exception {
            received++;
            if (received % logfreq == 0) {
                // throughput over entire time
                long now = System.currentTimeMillis();

                // throughput for the last "logfreq" elements
                if(lastLog == -1) {
                    // init (the first)
                    lastLog = now;
                    lastElements = received;
                } else {
                    long timeDiff = now - lastLog;
                    long elementDiff = received - lastElements;
                    double ex = (1000/(double)timeDiff);
                    LOG.info("During the last {} ms, we received {} elements. That's {} elements/second/core. GB received {}",
                            timeDiff, elementDiff, elementDiff*ex, (received * 2500) / 1024 / 1024 / 1024);
                    // reinit
                    lastLog = now;
                    lastElements = received;
                }
            }
        }
    });

    env.execute("Raw kafka throughput");
}
}

このコードは、Kafka からの 50k 要素間の時間を測定し、Kafka から読み取られた要素の数をログに記録しています。私のローカル マシンでは、約 33 万要素/コア/秒のスループットが得られました。

16:09:34,028 INFO  RawKafka                                                      - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 86 ms, we received 30000 elements. That's 348837.20930232556 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 90 ms, we received 30000 elements. That's 333333.3333333333 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 91 ms, we received 30000 elements. That's 329670.3296703297 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0

Kafka からの読み取りで達成されているスループットを確認することに非常に興味があります。

于 2015-11-25T15:15:56.787 に答える