0

たとえば、次の実行可能な Java コードがあります。

これは、プロデューサーと複数の並行コンシューマーに関するものです。これらのコンシューマは、時間のかかるジョブを実行しており、並行して実行しています。

このユースケースが rx-java と一致するかどうか、また rx-java でどのように書き換えるかが問題です。

public class DemoInJava {
    public static void main(String[] args) {

        final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

        AtomicBoolean done = new AtomicBoolean(false);
        Thread producer = new Thread(() -> {
            int offset = 0;
            int limit = 10;
            while (true) {
                if (queue.isEmpty()) {
                    if (offset < 100) {// there is 100 records in db
                        fetchDataFromDb(offset, limit).forEach(e -> queue.add(e));
                        offset = offset + limit;
                    } else {
                        done.set(true);
                        break; // no more data
                    }
                } else {
                    try {
                        Thread.sleep(100l);// I don't like the idea of sleep, but it seems to be the simplest way.
                    } catch (InterruptedException e) {
                    }
                }
            }
        });

        List<Thread> consumers = IntStream.range(0, 5).boxed().map(c -> new Thread(() ->
        {
            while (true) {
                Integer i = queue.poll();
                if (i != null) {
                    longRunJob(i);
                } else {
                    if (done.get()) {
                        break;
                    } else {
                        try {
                            Thread.sleep(100l);// I don't like the idea of sleep, but it seems to be the simplest way.
                        } catch (InterruptedException e) {
                        }
                    }
                }
            }
        })).collect(Collectors.toList());

        producer.start();
        consumers.forEach(c -> c.start());
    }

    private static List<Integer> fetchDataFromDb(int offset, int limit) {
        return IntStream.range(offset, offset + limit).boxed().collect(Collectors.toList());
    }

    private static void longRunJob(Integer i) {
        System.out.println(Thread.currentThread().getName() + " long run job of " + i);
    }
}

出力は次のとおりです。

....
Thread-1 long run job of 7
Thread-1 long run job of 8
Thread-1 long run job of 9
Thread-4 long run job of 10
Thread-4 long run job of 16
Thread-10 long run job of 14
Thread-5 long run job of 15
Thread-8 long run job of 13
Thread-7 long run job of 12
Thread-9 long run job of 11
Thread-10 long run job of 19
Thread-4 long run job of 18
Thread-3 long run job of 17
....
4

1 に答える 1

1

見てみましょう...まず、コード:

package rxtest;

import static io.reactivex.Flowable.generate;
import static io.reactivex.Flowable.just;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import io.reactivex.Emitter;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;

public class Main {

    private static final Scheduler SCHEDULER = Schedulers.from(Executors.newFixedThreadPool(10));

    private static class DatabaseProducer {
        private int offset = 0;
        private int limit = 100;

        void fetchDataFromDb(Emitter<List<Integer>> queue) {
            System.out.println(Thread.currentThread().getName() + " fetching "+offset);
            queue.onNext(IntStream.range(offset, offset + limit).boxed().collect(Collectors.toList()));
            offset += limit;
        }
    }

    public static void main(String[] args) {
        generate(new DatabaseProducer()::fetchDataFromDb)
        .subscribeOn(Schedulers.io())
        .concatMapIterable(list -> list, 1) // 1 call, no prefetch
        .flatMap(item -> 
                just(item)
                .doOnNext(i -> longRunJob(i))
                .subscribeOn(SCHEDULER)
                , 10) // don't subscribe to more than 10 at a time
        .take(1000)
        .blockingSubscribe();
    }

    private static void longRunJob(Integer i) {
        System.out.println(Thread.currentThread().getName() + " long run job of " + i);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

クラスDatabaseProducerは、現在のオフセットが必要なため、単純に値のステートフル プロデューサーです。generate呼び出しは次のように置き換えられる可能性があるため、厳密には必要ありません。

        generate(() -> 0, (offset,e) -> {
            e.onNext(IntStream.range(offset, offset + 100).boxed()
                       .collect(Collectors.toList()));
            return offset + 100;
        }, e -> {});

しかし、それはほとんど読みやすいものではありません。

and は、処理するための空きスレッドがなくても、実装に依存する制限まで、observables/ flowablesをプリフェッチcocatMapおよびflatMapプリサブスクライブできることに注意してください。それらは単にスケジューラでキューに入れられます。各呼び出しの数字は、concatMap必要な場合にのみデータベースからフェッチするため、必要な制限を表します (ここに 2 を入れると、読み過ぎてしまう可能性がありますが、パイプライン)。

Cpu バウンドの計算をSchedulers.computation()実行する場合は、JVM が実行されているシステムの CPU の数に自動構成され、コードベースの他の部分から使用できるため、使用することをお勧めします。プロセッサを過負荷にしないでください。

于 2016-11-20T10:31:50.953 に答える