1

Java 7、グラスフィッシュ 3.1.2

入力は次のようなメッセージです。

public class Message {

    private final String contextId;
    private final String name;
    ...
}

このメッセージはワーカーによって処理される必要があります。新しい contextId を持つメッセージの場合、新しいスレッドを開始する必要があります。すでに存在する contextId には、既存のスレッドを使用します。既存のスレッドは、同じ contextId シーケンシャルでメッセージを処理する必要があります。

Hier 私の最後の、動作していない、Worker バージョン。

@Stateless
@LocalBean
public class Worker {

private static final Map<String, Future<Result>> MAP = new ConcurrentHashMap<>();
@EJB
private Worker worker;

@Asynchronous
public void work(Message message) {
    System.out.println(Thread.currentThread().getName() + ": A  message: " + message.toString()+ " should be processed");
    Future<Result> sameContext = MAP.get(message.getContextId());
    if (sameContext != null) {
        waitForSameContextId(message, sameContext);
    }
    MAP.put(message.getContextId(), worker.doWork(message));
}

@Asynchronous
public Future<Result> doWork(Message message) {
    System.out.println(Thread.currentThread().getName() + ": Processing the message:  " + message.toString());

    AsyncResult<Result> asyncResult = new AsyncResult<>(new Result());
    try {
        Thread.sleep(15000);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }

    MAP.remove(message.getContextId());  //We are done removing 
    System.out.println(Thread.currentThread().getName() + ": The message: " + message.toString()+ " was processed");
    return asyncResult;
}

private void waitForSameContextId(Message message, Future<Result> result) {
    try {
        System.out.println(Thread.currentThread().getName() + ": message with id: " + message.toString()
                + " is already in work, blocking Thread until it is finished");
        Result get = result.get(); //blocks thread
    } catch (InterruptedException | ExecutionException ex) {
        ex.printStackTrace();
        // Do some failure management
    }
}

テストクラス:

public class MessageReceiver {

private static String ID = "#########";

@EJB
private Worker worker;

public void receive(Message message) {

    worker.work(message);
}

@PostConstruct
void init() {

    receive(new Message(ID, "message 1"));
    receive(new Message(ID, "message 2"));
    receive(new Message(ID, "message 3"));
 ...
 }
4

1 に答える 1

0

CDI @Observes パターンを使用して @Asynchronous サービスを呼び出すことができると思います。詳細な説明については、このリンクをご覧ください http://www.devchronicles.com/2011/12/javaee-revisits-design-patterns_28.html

于 2013-04-28T21:21:57.197 に答える