0

次の方法で、メンテナンス タスクに rxjava を使用しています。

定期的なメンテナンスが必要なクラスでは、次の静的サブスクリプションを使用します。これにより、クラスがメモリにロードされたときに Observable が初めて起動され、その後、指定された定期的な間隔で起動されます。

private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS)
        .flatMap(new Func1<Long, Observable<String>>() {
            @Override public Observable<String> call(Long aLong) {

                // some code

                return Observable.just(null);
            }
        }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
        .subscribe();

ここで、メンテナンスの結果を UI に報告したい状況があります。

通常、次のスキーマを使用します

    Observable.create(new Observable.OnSubscribe<String>() {

        @Override public void call(Subscriber<? super String> subscriber) {

            // some code

            subscriber.onNext(result);
            subscriber.onCompleted();            }

    }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<String>() {

                @Override public void call(String result) {

                    // write to the UI

                }

            });

しかし、ここには一度だけ実行される Observable があります。

定期的に実行される Observable の場合、Subscriber で Action を呼び出す方法が見つからなかったため、subscriber.onNext() を使用して結果を渡すことができました。オブザーバブルに適した署名がないように見えます。これは、timer() から時間がかかり、同時にアクションでサブスクライブできます。しかし、rxjavaを知っていると、トリックがあると確信しています;-)

zip を使用して Timer Observable と 1 回限りの Observable を圧縮する (基本的には 2 つのバージョンを一緒に圧縮する) こともできますが、動作が少し異なるため、最初の構造を使用します。

--

次の方法で、両方のバージョンを 1 つにマージしようとしました。

private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS)
        .flatMap(new Func1<Long, Observable<String>>() {
            @Override public Observable<String> call(Long aLong) {

                // some code // stays here to ensure there is no concurrency while executing

                final String result = "result"; // I store the result in a final variable after some code has been finished

                return Observable.create(new Observable.OnSubscribe<String>() {
                    @Override public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext(result); // then I use it in a new Observable and emit it
                        subscriber.onCompleted(); // not sure if this is needed here (haven't tested this yet)
                    }
                });

            }
        }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
        .subscribe(new Action1<String>() {
            @Override public void call(String result) {
                // so I can finally consume the result on the UI thread
            }
        });

「null」Observable を作成して発行する代わりに、サブスクライバーに結果を送信できるものを作成します。

かなり面倒ですが、これでうまくいくはずですよね?もっと簡単な解決策はありますか?あなたの考えは何ですか?

4

2 に答える 2

0

この質問に答えを追加するのはしばらく前のことです。ただし、それは他の誰かを助けるかもしれません。オブザーバブルを作成して、たとえば3秒ごとに実行したいことを理解しています。以下の私の例を見つけてください( Observable.just のような既製のオペレーターを発行するか、 Observable.create operator を使用して独自のオブザーバブルを作成することもできます):

Observable.interval(3,TimeUnit.SECONDS).flatMap(new Function<Long, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Long aLong) throws Exception {

            return Observable.just("Hello " + Thread.currentThread().getName()+aLong);
           /*
            return Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    try{
                        String data = fetchData("http://www.yahoo.com");
                        emitter.onNext(data.length()+"");
                        emitter.onComplete();
                    }catch (Exception e){
                        emitter.onError(e);
                    }
                }
            });
            */
        }
    }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("RxResult","--  " + Thread.currentThread().getName() + " -- " +s);
        }
    });

public String fetchData(String url) throws IOException {
    BufferedReader in = new BufferedReader(new InputStreamReader(new URL(url).openStream()));
    String inputLine;
    String result = "";

    while((inputLine = in.readLine()) != null){
        result += inputLine;
    }

    return result.length()+"";
}
于 2018-05-15T13:44:02.277 に答える