4

タイムリーに完了するにはマルチスレッドが必要な単純なプログラムを作成するために、Pivo​​tal の新しくリリースされたリアクター フレームワークを調べてみようと思いました。

フレームワークを理解し、それを使ってどのように使用されるかを理解するために、次のサンプル プロジェクトを作成しました。

Main.java:

package reactortest;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Main { 
    public static void main(String[] args) throws InterruptedException {
        try(AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(MainConfiguration.class)) {
            MyProducer producer = context.getBean(MyProducer.class);
            producer.run();
        }
    }
}

MyProducer.java:

package reactortest;

import java.util.concurrent.CountDownLatch;

import reactor.core.Reactor;
import reactor.event.Event;

public class MyProducer {
    private final Reactor reactor;
    private final Integer messagesToPrint;
    private final CountDownLatch countDownLatch;

    public MyProducer(final Reactor reactor, final Integer messagesToPrint, CountDownLatch countDownLatch) {
        this.reactor = reactor;
        this.messagesToPrint = messagesToPrint;
        this.countDownLatch = countDownLatch;
    }

    public void run() throws InterruptedException {
        for(int i = 0; i < messagesToPrint; ++i) {
            reactor.notify(Event.wrap("String event: " + i));
        }

        countDownLatch.await();
        System.out.println("Finished. Remaining count is: " + countDownLatch.getCount());
    }
}

MyConsumer.java:

package reactortest;

import java.util.concurrent.CountDownLatch;

import reactor.event.Event;
import reactor.function.Consumer;

public class MyConsumer implements Consumer<Event<String>> {
    private final CountDownLatch countDownLatch;

    public MyConsumer(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void accept(Event<String> message) {
        System.out.println(message);
        countDownLatch.countDown();
    }
}

最後に、 MainConfiguration.java:

package reactortest;

import java.util.concurrent.CountDownLatch;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.spec.Reactors;
import reactor.spring.context.config.EnableReactor;

@Configuration
@EnableReactor
public class MainConfiguration {
    private final Integer MESSAGESTOPRINT = 10;

    @Autowired private Environment environment;

    @Bean
    public CountDownLatch countDownLatch() {
        CountDownLatch countDownLatch = new CountDownLatch(MESSAGESTOPRINT);
        return countDownLatch;
    }

    @Bean
    public Reactor reactor() {
        Reactor reactor = Reactors.reactor().env(environment).dispatcher(Environment.THREAD_POOL).randomEventRouting().get();
        reactor.on(consumer());
        return reactor;
    }

    @Bean
    public MyProducer producer() {
        MyProducer producer = new MyProducer(reactor(), MESSAGESTOPRINT, countDownLatch());
        return producer;
    }

    @Bean
    public MyConsumer consumer() {
        MyConsumer consumer = new MyConsumer(countDownLatch());
        return consumer;
    }
}

私の問題は、プログラムが決して終了しないことです。コンシューマーは、実行ごとに異なる情報も出力します。3 回連続して実行すると、次のように出力されます。

1st run:
Event{id=null, headers=null, replyTo=null, data=String event: 0}
Event{id=null, headers=null, replyTo=null, data=String event: 1}
Event{id=null, headers=null, replyTo=null, data=String event: 7}
Event{id=null, headers=null, replyTo=null, data=String event: 8}

2nd run:
Event{id=null, headers=null, replyTo=null, data=String event: 0}
Event{id=null, headers=null, replyTo=null, data=String event: 1}
Event{id=null, headers=null, replyTo=null, data=String event: 5}
Event{id=null, headers=null, replyTo=null, data=String event: 6}
Event{id=null, headers=null, replyTo=null, data=String event: 9}

3rd run:
Event{id=null, headers=null, replyTo=null, data=String event: 2}
Event{id=null, headers=null, replyTo=null, data=String event: 4}
Event{id=null, headers=null, replyTo=null, data=String event: 6}

これは、注釈が構成されているのではなくjavaconfigであり、外部とのやり取りがないことを除けば、これがここの例とどのように異なるかがわからないため、本当に明白な何かを見逃したに違いないようです。

4

1 に答える 1

5

この質問をしている間、私はコードを改良していましたが、最終的にはうまくいきました (いくつかの素晴らしいラバーダッキング)。質問を削除するのではなく、他の誰かが同じ問題に遭遇した場合に備えて投稿すると思いました。

上記のコードの問題は、リアクターのセットアップ中の randomEventRouting() 呼び出しです。このフラグを設定すると、ルーティング先のコンシューマーがランダムに選択されます。ディスパッチ先のコンシューマーを定義する特定のセレクター/キーを設定していないため、キーが提供されていない場合はすべてのコンシューマーが一致するため、イベントの一部が渡される舞台裏でデフォルトのコンシューマーが設定されていると想定します.

セレクターを受け入れるように reactor.on() を変更します。

reactor.on(Selectors.$(selector()), consumer());

セレクターは単純です:

@Bean
public String selector() {
    String selector = "My very special event";
    return selector;
}

このキーをプロデューサーに挿入し、reactor.notify() を呼び出すときに使用します。

reactor.notify(selector, Event.wrap("String event: " + i));

期待どおりに動作しました。

ほとんどのユーザーがキーを定義する (そしてそうすべきである) ため、これは非常にまれなケースだと思いますが、決してわかりません。:)

于 2013-12-05T12:52:34.017 に答える