2

私は非同期呼び出しを行っています.10秒後に1分間、つまり約6回の呼び出しが行われますが、問題はdelay特定のものに適用したいcondition

Observable
.just(listOfSomethings_Locally)
.take(1, TimeUnit.MINUTES)
.serialize()
.delaySubscription( // this is confusing part 
() ->
    Observable.just(listOfItems_Network).take(10,TimeUnit.SECONDS)
) 

私が望むのは、最初の呼び出しを除いてネットワーク呼び出しを 10 秒間遅らせ、10 秒後にネットワーク呼び出しをキャンセルすることです。したがって、1 分間に正確に 6 つの呼び出しが必要です。

編集

シナリオの混乱のため、シナリオを再定義します。

私が持っているのはローカルのドライバーの大きなリストであり、10 秒ごとに各ドライバーに要求を送信し、別のサブスクライバーをリッスンして、ドライバーが 10 秒以内にキャンセルしなかったかどうかを確認したいのですが、このプロセスは約 1 分間続きます。あるドライバーがキャンセルした場合、すぐに次のドライバーにリクエストを送信する必要があります

これまでに書かれたコード:

Observable.from(driversGot)
                .take(1,TimeUnit.MINUTES)
                .serialize()
                .map(this::requestRydeObservable) // requesting for single driver from driversGot (it's a network call)
                .flatMap(dif ->
                        Observable.amb(
                                kh.getFCM().driverCanceledRyde(), // listen for if driver cancel request returns integer
                                kh.getFCM().userRydeAccepted()) // listen for driver accept returns RydeAccepted object
                                .map(o -> {
                                    if (o instanceof Integer) {
                                        return new RydeAccepted();
                                    } else if (o instanceof RydeAccepted) {
                                        return (RydeAccepted) o;
                                    }
                                    return null;
                                }).delaySubscription(10,TimeUnit.SECONDS)
                )
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(fua -> {
                    if (fua == null) {
                        UiHelpers.showToast(context, "Invalid Firebase response");
                    } else if (!fua.getStatus()) { // ryde is canceled because object is empty
                        UiHelpers.showToast(context, "User canceled ryde");
                    } else { // ryde is accepted
                        UiHelpers.showToast(context, "User accepted ryde");
                    }
                }, t -> {
                    t.printStackTrace();
                    UiHelpers.showToast(context,"Error sending driver requests");
                }, UiHelpers::stopLoading);
4

3 に答える 3

2

コードに関するフィードバック

必要はなくtake、すぐに + すでにシリアルに発行されますserializejust

delaySubscription渡されたオブザーバブルがイベントを生成した後、それ以上のイベントは遅延されないため、奇妙な選択のようです(これはあなたのケースと矛盾します) 遅延購読

オプション #1、rx のみ

+残りのイベントの個々の遅延をdelay計算します(つまり、1番目は0秒遅延、2番目は1秒遅延、3番目は3秒遅延、...)

            AtomicLong counter = new AtomicLong(0);
    System.out.println(new Date());
    Observable.just("1", "2", "3", "4", "5", "6")
        .delay(item -> Observable.just(item).delay(counter.getAndIncrement(), TimeUnit.SECONDS))
        .subscribe(new Consumer<String>() {
            public void accept(String result) throws Exception {
                System.out.println(result + " " + new Date());
            }
        });        
        System.in.read();

オプション #2: レート制限

あなたのユースケースはレート制限に適しているようです。そのため、グアバから RateLimiter を使用できます。

            RateLimiter limiter = RateLimiter.create(1);
    System.out.println(new Date());
    Observable.just("1", "2", "3", "4", "5", "6")
        .map(r -> {
            limiter.acquire();
            return r;
        })
        .subscribe(new Consumer<String>() {
            public void accept(String result) throws Exception {
                System.out.println(result + " " + new Date());
            }
        });        
        System.in.read();

どちらも同様に機能します。

Tue Nov 15 11:14:34 EET 2016
1 Tue Nov 15 11:14:34 EET 2016
2 Tue Nov 15 11:14:35 EET 2016
3 Tue Nov 15 11:14:36 EET 2016
4 Tue Nov 15 11:14:37 EET 2016
5 Tue Nov 15 11:14:38 EET 2016
6 Tue Nov 15 11:14:39 EET 2016

レート リミッターは、たとえば 5 秒間のハンドリングなどのリクエストの場合により適切に機能し、次のリクエストが遅延を補って 10 秒間 1req/s の目標に到達するまでの速度を上げます。

于 2016-11-15T09:16:26.270 に答える
0

こんにちは、遅れて再試行を使用できますここでそれを行う方法があります

于 2016-11-15T00:29:38.293 に答える