0

librdkafka のラッパーであるcppkafka ライブラリを使用しており、非常に単純なメッセージ ストリーミング タスク用の C++ Kafka クライアントを使用しています。メッセージを受信するのにかなり時間がかかるため、私の消費者クラスはおかしな動作をしています。より正確には、受信実行可能ファイルが実行され、実行が継続されるたびに、コンシューマーはメッセージの最初のバッチを正しく受信できますが、後続のメッセージが到着するまでに約 15 秒かかります。どのような可能性がこのようなものにつながる可能性があるか(カフカ構成、ライブラリ固有の問題、または私の愚かな障害)を誰でも理解していますか? 百万の感謝。

私の受信スレッドは次のとおりです

configuration_.set("group.id", 0);
consumer_ = std::make_unique<cppkafka::Consumer>(configuration_);
consumer_->subscribe({TopicTraits<trade::OrderRequest>::topic, TopicTraits<trade::CancelRequest>::topic});
std::thread([this] {
  while (working_) {
    cppkafka::Message msg = consumer_->poll();
    if (msg) {
      if (msg.get_error()) {
        if (!msg.is_eof()) {
          ERROR("error occurred while polling message: {}", msg.get_error());
        }
      } else {
        try {
          Json j = Json::parse(msg.get_payload());
          if (msg.get_topic() == TopicTraits<trade::OrderRequest>::topic) {
            INFO("received [order_req], {}", msg.get_payload());
            ReceiveOrderRequest(j.get<trade::OrderRequest>());
          } else if (msg.get_topic() == TopicTraits<trade::CancelRequest>::topic) {
            INFO("received [cancel_req], {}", msg.get_payload());
            ReceiveCancelRequest(j.get<trade::CancelRequest>());
          }
        } catch (const std::exception &e) {
          ERROR("error occurred while handling incoming message, {}", e.what());
        }
      }
    }
  }
}).detach();
4

1 に答える 1