1

親愛なる、私はある種のイベント駆動型マイクロサービスをやろうとしています。現在、Quarkus & Smallrye-Reactive メッセージング拡張機能を使用して、メッセージを受信したときに Kafka からメッセージを消費し、データベース レコードを更新することができました。さらに達成したいのは、成功した場合は他のトピックにメッセージを送信し、そうでない場合はエラートピックにメッセージを送信できるようにすることです。return および @outgoing アノテーションを使用して新しいメッセージを送信できることはわかっていますが、私の使用例には合わないと思います。メッセージの消費中にエラーが発生した場合は、ここでガイダンスが必要です。メッセージを元のトピックに返す必要がありますか (メッセージを確認せずに)、それを消費して別のトピックにエラー メッセージを生成し、元のトランザクションをロールバックする必要があります。

これが私のコードです:

@Incoming("new-payment")
public void newMessage(String msg) {
    LOG.info("New payment has been received.");
    LOG.info("Payload is {}", msg);
    PaymentEvent pe = jsob.fromJson(msg, PaymentEvent.class);
    mysqlPool.preparedQuery("select totalBuyers from Book where isbn = ? ",
                    Tuple.of(pe.getIsbn()))
            .thenApply(rs -> {
                RowIterator<Row> iterator = rs.iterator();
                if (iterator.hasNext()) {
                    return iterator.next().getInteger(0) + 1;
                } else {
                    return Integer.valueOf(0);
                }
            })
            .thenApply(totalCount -> {
                return mysqlPool.preparedQuery("update Book set totalBuyers = ?",
                        Tuple.of(totalCount));
            })
            .whenComplete((rs, err) -> {
                if (err != null) {
                    //Emit an error to error topic.
                } else {
                    //Emit a msg to other service.
                }
            });
}

また、より良いコードがあれば送信してください。私はまだリアクティブプログラミングの初心者です:)。

4

2 に答える 2