0

これは、以前のリアクティブ カフカの問題 (リアクティブ カフカにデータのフラックスを送信している間の問題) に続く質問です。

リアクティブなアプローチを使用して、いくつかのログ レコードを kafka に送信しようとしています。これは、リアクティブ カフカを使用してメッセージを送信するリアクティブ コードです。

public class LogProducer {

    private final KafkaSender<String, String> sender;

    public LogProducer(String bootstrapServers) {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "log-producer");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        SenderOptions<String, String> senderOptions = SenderOptions.create(props);

        sender = KafkaSender.create(senderOptions);
    }

    public void sendMessages(String topic, Flux<Logs.Data> records) throws InterruptedException {
    
        AtomicInteger sentCount = new AtomicInteger(0);
        sender.send(records
        .map(record -> {
            LogRecord lrec = record.getRecords().get(0);
            String id = lrec.getId();
            Thread.sleep(0, 5); // sleep for 5 ns
            return SenderRecord.create(new ProducerRecord<>(topic, id,
                    lrec.toString()), id);
        })).doOnNext(res -> sentCount.incrementAndGet()).then()
        .doOnError(e -> {
            log.error("[FAIL]: Send to the topic: '{}' failed. "
                    + e, topic);
        })
        .doOnSuccess(s -> {
            log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
        })
        .subscribe();
    }

}


public class ExecuteQuery implements Runnable {

    private LogProducer producer = new LogProducer("localhost:9092");

    @Override
    public void run() {
        Flux<Logs.Data> records = ...
        producer.sendMessages(kafkaTopic, records);
        .....
        .....
        // processing related to the messages sent
    }

}

そのため、 が存在する場合でも、Thread.sleep(0, 5);すべてのメッセージを kafka に送信しない場合があり、プログラムは早期に SUCCESS メッセージを出力します ( log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);)。この問題を解決する具体的な方法はありますか。たとえば、何らかのコールバックを使用して、スレッドがすべてのメッセージが正常に送信されるまで待機するようにします。

私は春のコンソールアプリケーションを持っていて、固定レートでスケジューラを介して ExecuteQuery を実行しています。

public class Main {

private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private ExecutorService executor = Executors.newFixedThreadPool(POOL_SIZE);

public static void main(String[] args) {
     QueryScheduler scheduledQuery = new QueryScheduler();
     scheduler.scheduleAtFixedRate(scheduledQuery, 0, 5, TimeUnit.MINUTES);
}

class QueryScheduler implements Runnable {

  @Override
  public void run() {
      // preprocessing related to time
      executor.execute(new ExecuteQuery());
      // postprocessing related to time
  }

}
}
4

1 に答える 1