1

akka-reactive kafka私は最近、消費者のパフォーマンスを向上させるために、従来の kafka-clients Java から移行しました。の最新バージョンを使用していakka-reactive-kafka (0.16)ます。私のフローは非常に単純ですread from kafka then group by batch then process by batch then commit。少しベンチマークを行いましたが、約で非常に悪い結果が得られました2 000 msg / sec。ベンチについては、kafka-producer-perf-test.sh.

bin/kafka-producer-perf-test.sh --topic benchmark-producer --producer.config config/producer.properties --record-size 1322 --num-records 300000 --throughput 10000

28153 records sent, 5630,6 records/sec (7,10 MB/sec), 1187,7 ms avg latency, 2187,0 max latency.
29748 records sent, 5949,6 records/sec (7,50 MB/sec), 3279,2 ms avg latency, 4076,0 max latency.
30528 records sent, 6099,5 records/sec (7,69 MB/sec), 3946,9 ms avg latency, 4043,0 max latency.
28476 records sent, 5694,1 records/sec (7,18 MB/sec), 4278,0 ms avg latency, 4425,0 max latency.
29112 records sent, 5811,9 records/sec (7,33 MB/sec), 4168,8 ms avg latency, 4303,0 max latency.
28404 records sent, 5680,8 records/sec (7,16 MB/sec), 4486,7 ms avg latency, 4603,0 max latency.
29220 records sent, 5837,0 records/sec (7,36 MB/sec), 4081,9 ms avg latency, 4243,0 max latency.
28728 records sent, 5745,6 records/sec (7,24 MB/sec), 4381,9 ms avg latency, 4477,0 max latency.
29088 records sent, 5816,4 records/sec (7,33 MB/sec), 4089,1 ms avg latency, 4238,0 max latency.
28080 records sent, 5614,9 records/sec (7,08 MB/sec), 4472,6 ms avg latency, 4627,0 max latency.
300000 records sent, 5798,446016 records/sec (7,31 MB/sec), 3852,98 ms avg latency, 4627,00 ms max latency, 4103 ms 50th, 4585 ms 95th, 4615 ms 99th, 4623 ms 99.9th.

scala コード + ログの下:

implicit  val actorSystem = ActorSystem("benchmark-kafka")
    implicit val actorMaterializer = ActorMaterializer()

    val consumerSettings =
      ConsumerSettings(actorSystem, new StringDeserializer, new ByteArrayDeserializer)
        .withBootstrapServers("mybroker:9094")
        .withGroupId("kafka-producer-bench")
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
        .withProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "2000000")


    Consumer
      .committableSource(consumerSettings, Subscriptions.topics("benchmark-producer"))
      .groupedWithin(5000, 1.seconds)
      .mapAsync(1) { group =>
        println((new java.sql.Timestamp(System.currentTimeMillis()) + " : Fetch " + group.size + " records"))
        Future.successful(group)
      }
      .map {
        group =>
          group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) =>
            batch.updated(elem.committableOffset)
          }
      }
      .mapAsync(1) { msg =>
        println((new Timestamp(System.currentTimeMillis())) + " Will commit : " + msg.getOffsets())
        msg.commitScaladsl()
      }
      .runWith(Sink.ignore)

[

info] 2017-06-22 18:38:28.456 : Fetch 12 records
[info] 2017-06-22 18:38:28.46 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22970806}
[info] 2017-06-22 18:38:29.456 : Fetch 372 records
[info] 2017-06-22 18:38:29.459 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22971178}
[info] 2017-06-22 18:38:30.456 : Fetch 773 records
[info] 2017-06-22 18:38:30.458 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22971951}
[info] 2017-06-22 18:38:31.456 : Fetch 773 records
[info] 2017-06-22 18:38:31.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22972724}
[info] 2017-06-22 18:38:32.456 : Fetch 773 records
[info] 2017-06-22 18:38:32.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22973497}
[info] 2017-06-22 18:38:33.456 : Fetch 1546 records
[info] 2017-06-22 18:38:33.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22975043}
[info] 2017-06-22 18:38:34.456 : Fetch 1546 records
[info] 2017-06-22 18:38:34.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22976589}
[info] 2017-06-22 18:38:35.456 : Fetch 1546 records
[info] 2017-06-22 18:38:35.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22978135}
[info] 2017-06-22 18:38:36.456 : Fetch 2319 records
[info] 2017-06-22 18:38:36.458 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22980454}
[info] 2017-06-22 18:38:37.456 : Fetch 1546 records
[info] 2017-06-22 18:38:37.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22982000}
[info] 2017-06-22 18:38:38.455 : Fetch 2383 records
[info] 2017-06-22 18:38:38.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22984383}
[info] 2017-06-22 18:38:39.456 : Fetch 1546 records
[info] 2017-06-22 18:38:39.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22985929}
[info] 2017-06-22 18:38:40.456 : Fetch 2319 records
[info] 2017-06-22 18:38:40.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22988248}
[info] 2017-06-22 18:38:41.456 : Fetch 1546 records
[info] 2017-06-22 18:38:41.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22989794}
[info] 2017-06-22 18:38:42.456 : Fetch 1546 records
[info] 2017-06-22 18:38:42.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22991340}
[info] 2017-06-22 18:38:43.455 : Fetch 2319 records
[info] 2017-06-22 18:38:43.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22993659}
[info] 2017-06-22 18:38:44.456 : Fetch 1546 records
[info] 2017-06-22 18:38:44.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22995205}
[info] 2017-06-22 18:38:45.456 : Fetch 2319 records
[info] 2017-06-22 18:38:45.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22997524}
[info] 2017-06-22 18:38:46.456 : Fetch 1546 records
[info] 2017-06-22 18:38:46.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22999070}
[info] 2017-06-22 18:38:47.456 : Fetch 1546 records
[info] 2017-06-22 18:38:47.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23000616}
[info] 2017-06-22 18:38:48.456 : Fetch 2319 records
[info] 2017-06-22 18:38:48.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23002935}
[info] 2017-06-22 18:38:49.456 : Fetch 1546 records
[info] 2017-06-22 18:38:49.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23004481}
[info] 2017-06-22 18:38:50.456 : Fetch 2319 records
[info] 2017-06-22 18:38:50.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23006800}
[info] 2017-06-22 18:38:51.456 : Fetch 1546 records
[info] 2017-06-22 18:38:51.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23008346}
[info] 2017-06-22 18:38:52.455 : Fetch 1546 records
[info] 2017-06-22 18:38:52.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23009892}
[info] 2017-06-22 18:38:53.455 : Fetch 2046 records

参考までに、このコードを EC2 t2 ミディアム マシンで実行し、Python で記述された kafka クライアントを使用して、5000 メッセージ/秒のレートを得たので、リアクティブ API の使用方法で何か間違ったことをしていると思いますが、見つかりません何。react-kafka-benchmarkに関するこのブログを読むと、スループットが向上するはずです!

4

0 に答える 0