ですから、これは以前に何度も尋ねられたことは知っていますが、多くのことを試しましたが、何もうまくいかないようです.
これらのブログ/記事/コードから始めましょう:
- https://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/
- https://jimbaca.com/rxjava-retrywhen/
- http://blog.inching.org/RxJava/2016-12-12-rx-java-error-handling.html
- https://pamartinezandres.com/rxjava-2-exponential-backoff-retry-only-when-internet-is-available-5a46188ab175
- https://gist.github.com/wotomas/35006d156a16345349a2e4c8e159e122
そして他の多く。
簡単に言えば、それらはすべて、retryWhen を使用して指数バックオフを実装する方法を説明しています。このようなもの:
source
.retryWhen(
errors -> {
return errors
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(
retryCount -> {
System.out.println("retry count " + retryCount);
return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
});
})
ただし、これといくつかのかなり類似したバリエーションを試しましたが、ここで説明する価値はありませんが、何も機能していないようです. 例が機能し、ブロックしているサブスクライバーを使用している方法がありますが、スレッドのブロックを避けたいです。
したがって、前のオブザーバブルに次のようなブロッキング サブスクライバーを適用するとします。
.blockingForEach(System.out::println);
期待どおりに動作します。しかし、それはアイデアではありません。試してみると:
.subscribe(
x -> System.out.println("onNext: " + x),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));
フローは一度しか実行されないため、達成したいことではありません。
それは私がしようとしているように使用できないことを意味しますか? ドキュメントから、私の要件を達成しようとするのは問題ではないようです。
私は何が欠けているのですか?
ティア。
編集:これをテストしている2つの方法があります:
テスト方法 (testng を使用):
Observable<Integer> source =
Observable.just("test")
.map(
x -> {
System.out.println("trying again");
return Integer.parseInt(x);
});
source
.retryWhen(
errors -> {
return errors
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(
retryCount -> {
return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
});
})
.subscribe(...);
Kafka コンシューマーから (Spring ブートを使用):
これはオブザーバーへのサブスクリプションにすぎませんが、再試行ロジックは、この投稿で以前に説明したものです。
@KafkaListener(topics = "${kafka.config.topic}")
public void receive(String payload) {
log.info("received payload='{}'", payload);
service
.updateMessage(payload)
.subscribe(...)
.dispose();
}