3

300ミリ秒ごとに発行されるイベントを処理するための固定スレッドプールを作成し、プロセスに1000ミリ秒が必要であると想定しています。マルチスレッドは機能するが、再利用されるスレッドは 1 つだけだとします。

sleepTime を 300ms 未満に設定すると、処理スレッドが変更されますが、それは役に立ちません。

質問: 同時実行するにはどうすればよいですか? プログラムがスレッドを再利用するのはなぜですか?

前もって感謝します

public static void main(String[] args) throws InterruptedException {
    long sleepTime = 1000;
    ExecutorService e = Executors.newFixedThreadPool(3);

    Observable.interval(300, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.computation())
    .flatMap(new Func1<Long, Observable<Long>>() {
        @Override
        public Observable<Long> call(Long pT) {
            return Observable.just(pT).subscribeOn(Schedulers.from(e));
        }
    })
    .doOnNext(new Action1<Long>() {

        @Override
        public void call(Long pT) {
            try {
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    })
    .subscribe(new Action1<Long>() {

        @Override
        public void call(Long pT) {
            System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());

        }
    });


    Thread.sleep(50000);
    e.shutdownNow();

}

ログ

i am 0in thread:pool-1-thread-1
i am 1in thread:pool-1-thread-1
i am 2in thread:pool-1-thread-1
i am 3in thread:pool-1-thread-1
i am 4in thread:pool-1-thread-1
i am 5in thread:pool-1-thread-1
i am 6in thread:pool-1-thread-1
i am 7in thread:pool-1-thread-1
i am 8in thread:pool-1-thread-1
i am 9in thread:pool-1-thread-1
i am 10in thread:pool-1-thread-1
i am 11in thread:pool-1-thread-1
4

3 に答える 3

0

その代わり

 .subscribeOn(Schedulers.computation())

試す

 .observeOn(Schedulers.computation())

Rx との同時実行性を試すために以前に行ったこの例は、例として非常にうまく機能します

   public class ObservableZip {

private Scheduler scheduler;
private Scheduler scheduler1;
private Scheduler scheduler2;

@Test
public void testAsyncZip() {
    scheduler = Schedulers.newThread();
    scheduler1 = Schedulers.newThread();
    scheduler2 = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
                                                                                        .concat(s3))
              .subscribe(result -> showResult("Async:", start, result));
}




public void showResult(String transactionType, long start, String result) {
    System.out.println(result + " " +
                               transactionType + String.valueOf(System.currentTimeMillis() - start));
}


public Observable<String> obAsyncString() {
    return Observable.just("")
                     .observeOn(scheduler)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "Hello");
}

public Observable<String> obAsyncString1() {
    return Observable.just("")
                     .observeOn(scheduler1)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> " World");
}

public Observable<String> obAsyncString2() {
    return Observable.just("")
                     .observeOn(scheduler2)
                     .doOnNext(val -> {
                         System.out.println("Thread " +  Thread.currentThread()
                                                               .getName());
                     })
                     .map(val -> "!");
  }

 }
于 2015-12-30T09:06:20.197 に答える
0

あなたのコードで私が理解していることから、プロデューサーはサブスクライバーよりも速い速度で生成しています。ただし、Observable<Long> interval(long interval, TimeUnit unit)実際にはサポートしていませんBackpressure。ドキュメントには、

この演算子は時間を使用するため、バックプレッシャーをサポートしていません。ダウンストリームでより遅い速度が必要な場合は、タイマーを遅くするか、{@link #onBackpressureDrop} などを使用する必要があります。

処理がプロデューサーよりも非常に遅い場合、サブスクライバー コードでできることは次のようなものです。

.subscribe(new Action1<Long>() {

    @Override
    public void call(Long pT) {
        e.submit(new Runnable() {
            System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());

        }
    }
});
于 2015-12-30T08:02:32.490 に答える