4

これまで RxJava を使っていましたが、reactive stream 仕様に準拠しているため、projectreactor.io の reactor-core をいじり始めています。

次のテストでは、乱数を生成するホット フラックス (ConnectableFlux) を作成します。すぐに connect() すると、256 個の値がプリフェッチされます (実際には 258 個がログに表示されます)。サブスクライバーがしばらくしてからサブスクライブしないことをシミュレートするために、5 秒間待機します。

メイン スレッドが起動した後、RnApp は ConnectableFlux をサブスクライブしrandomNumberGenerator.subscribe(new RnApp());ます。次にRnApp.onSubscribe()が呼び出され、10 個の要素が要求されます。その後、java.lang.IllegalStateException: Queue full?!例外が発生します (RnApp.onError()呼び出されます)、なぜですか?

加入者:

public class RnApp implements Subscriber<Float>{

    private Subscription subscription;
    private List<Float> randomNumbers = new ArrayList<Float>();

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }

    @Override
    public void onError(Throwable err) {
        err.printStackTrace();
    }

    @Override
    public void onNext(Float f) {
        if(this.randomNumbers.size()>=10){
            this.subscription.cancel();
        }else{
            this.randomNumbers.add(f);
        }
    }

    @Override
    public void onSubscribe(Subscription subs) {
        this.subscription = subs;
        this.subscription.request(10);
    }
}

パブリッシャーテスト:

@Test
public void randomNumberReading() throws InterruptedException {

    CountDownLatch latch = new CountDownLatch(1);
    ConnectableFlux<Float> randomNumberGenerator = ConnectableFlux.<Float>create( (c) -> {
        SecureRandom sr = new SecureRandom();
        int i = 1;
        while(true){
            try {
                Thread.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("-----------------------------------------------------"+(i++));
            c.onNext(sr.nextFloat());
        }
    }).log().subscribeOn(Computations.concurrent()).publish();

    randomNumberGenerator.connect();

    Thread.sleep(5000);

    randomNumberGenerator.subscribe(new RnApp());

    latch.await();
}

ログ:

11:12:05.125 [main] DEBUG r.core.util.Logger$LoggerFactory - Using Slf4j logging framework
11:12:05.363 [concurrent-1] INFO  reactor.core.publisher.FluxLog -  onSubscribe(io.pivotal.literx.Part10SubscribeOnPublishOn$$Lambda$1/1586600255@29d4caeb)
11:12:05.371 [concurrent-1] INFO  reactor.core.publisher.FluxLog -  request(256)
-----------------------------------------------------1
11:12:06.000 [concurrent-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.39189225)
-----------------------------------------------------2
...
-----------------------------------------------------257
11:12:08.683 [concurrent-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.34729618)
-----------------------------------------------------258
11:12:08.697 [concurrent-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.7729547)
java.lang.IllegalStateException: Queue full?!
    at reactor.core.publisher.FluxPublish$State.onNext(FluxPublish.java:246)
    at reactor.core.publisher.FluxSubscribeOn$SubscribeOnPipeline.onNext(FluxSubscribeOn.java:134)
    at reactor.core.publisher.FluxLog$LoggerBarrier.doNext(FluxLog.java:130)
    at reactor.core.subscriber.SubscriberBarrier.onNext(SubscriberBarrier.java:85)
    at reactor.core.subscriber.SubscriberWithContext.onNext(SubscriberWithContext.java:92)
    at io.pivotal.literx.Part10SubscribeOnPublishOn.lambda$1(Part10SubscribeOnPublishOn.java:132)
    at reactor.core.publisher.FluxGenerate$ForEachBiConsumer.accept(FluxGenerate.java:145)
    at reactor.core.publisher.FluxGenerate$ForEachBiConsumer.accept(FluxGenerate.java:114)
    at reactor.core.publisher.FluxGenerate$SubscriberProxy.request(FluxGenerate.java:245)
    at reactor.core.subscriber.SubscriberBarrier.doRequest(SubscriberBarrier.java:146)
    at reactor.core.publisher.FluxLog$LoggerBarrier.doRequest(FluxLog.java:160)
    at reactor.core.subscriber.SubscriberBarrier.request(SubscriberBarrier.java:135)
    at reactor.core.util.DeferredSubscription.set(DeferredSubscription.java:71)
    at reactor.core.publisher.FluxSubscribeOn$SubscribeOnPipeline.onSubscribe(FluxSubscribeOn.java:129)
    at reactor.core.publisher.FluxLog$LoggerBarrier.doOnSubscribe(FluxLog.java:122)
    at reactor.core.subscriber.SubscriberBarrier.onSubscribe(SubscriberBarrier.java:67)
    at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:72)
    at reactor.core.publisher.FluxLog.subscribe(FluxLog.java:67)
    at reactor.core.publisher.FluxSubscribeOn$SourceSubscribeTask.run(FluxSubscribeOn.java:363)
    at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:919)
    at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:883)
    at reactor.core.publisher.WorkQueueProcessor$QueueSubscriberLoop.run(WorkQueueProcessor.java:842)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
4

2 に答える 2

4

RxJava と同様に、 を使用している場合はcreate()、キャンセルとバックプレッシャーを自分で処理する必要があります。代わりに、標準演算子からジェネレーターを構築できます。

ConnectableFlux<Double> secureRandomFlux = Flux.using(
    () -> new SecureRandom(),
    sr -> Flux.interval(10, TimeUnit.MILLISECONDS)
          .map(v -> sr.nextDouble())
          .onBackpressureDrop()
    sr -> { }
).publish();
于 2016-05-03T09:41:14.043 に答える
3

create()コールバック呼び出しごとに 1 つの onNext が必要です。Flux.yieldまたは、ダウンストリームの状態 (バックプレッシャーまたはキャンセル) を処理するための追加の発行メソッドを提供するものを確認することもできます。または、Flux.generatewhich is like createbut is invoked をリクエストごとに 1 回使用onNextして、渡された要求を効果的に処理できるようにすることもできます。

これら 3 つのFluxジェネレーターは現在議論されているため、http://github.com/reactor/reactor-core/issuesでより良い代替案について議論できます。

Flux#delaySubscription を使用して、たとえば最新のスナップショット UnicastProcessor を使用して、公開のプリフェッチを防ぐこともできます。

 UnicastProcessor<Object> p = UnicastProcessor.create();
 flux.delaySubscription(p).publish(128).autoConnect().subscribe();

 //...
 p.onNext(new Object());
于 2016-05-03T09:50:21.007 に答える