~30K メッセージ/秒を生成するいくつかの kafka トピックを見ています。これらの 1 つを読み取り、少し (5 秒のウィンドウ) を集約し、(最終的に) DB に書き込むための flink トポロジ設定があります。
トポロジーを実行し、読み取り -> 集約ステップ以外をすべて削除すると、 1 分あたり最大30K メッセージしか取得できません。バックプレッシャーが発生する場所はどこにもありません。
私は何を間違っていますか?
編集:
- トピックスペースについては何も変更できません。各トピックには 1 つのパーティションがあり、何百ものパーティションがあります。
- 各メッセージは、平均 2 ~ 3Kb の圧縮されたリサイクル オブジェクトです。
~1.5 MB/秒しか取得できないようです。v 言及された 100MB/s には近くありません。
現在のコード パス:
DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);
DataStream<Tuple4<Long, Long, Integer, String>> ds4 = dataStream4.rebalance().flatMap(new mapper2("data_4")).setParallelism(4);
public class mapper2 implements FlatMapFunction<byte[], Tuple4<Long, Long, Integer, String>> {
private String mapId;
public mapper2(String mapId) {
this.mapId = mapId;
}
@Override
public void flatMap(byte[] bytes, Collector<Tuple4<Long, Long, Integer, String>> collector) throws Exception {
TimeData timeData = (TimeData)ts_thriftDecoder.fromBytes(bytes);
Tuple4 tuple4 = new Tuple4<Long, Long, Integer, String>();
tuple4.f0 = timeData.getId();
tuple4.f1 = timeData.getOtherId();
tuple4.f2 = timeData.getSections().size();
tuple4.f3 = mapId;
collector.collect(tuple4);
}
}