Java 7、グラスフィッシュ 3.1.2
入力は次のようなメッセージです。
public class Message {
private final String contextId;
private final String name;
...
}
このメッセージはワーカーによって処理される必要があります。新しい contextId を持つメッセージの場合、新しいスレッドを開始する必要があります。すでに存在する contextId には、既存のスレッドを使用します。既存のスレッドは、同じ contextId シーケンシャルでメッセージを処理する必要があります。
Hier 私の最後の、動作していない、Worker バージョン。
@Stateless
@LocalBean
public class Worker {
private static final Map<String, Future<Result>> MAP = new ConcurrentHashMap<>();
@EJB
private Worker worker;
@Asynchronous
public void work(Message message) {
System.out.println(Thread.currentThread().getName() + ": A message: " + message.toString()+ " should be processed");
Future<Result> sameContext = MAP.get(message.getContextId());
if (sameContext != null) {
waitForSameContextId(message, sameContext);
}
MAP.put(message.getContextId(), worker.doWork(message));
}
@Asynchronous
public Future<Result> doWork(Message message) {
System.out.println(Thread.currentThread().getName() + ": Processing the message: " + message.toString());
AsyncResult<Result> asyncResult = new AsyncResult<>(new Result());
try {
Thread.sleep(15000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
MAP.remove(message.getContextId()); //We are done removing
System.out.println(Thread.currentThread().getName() + ": The message: " + message.toString()+ " was processed");
return asyncResult;
}
private void waitForSameContextId(Message message, Future<Result> result) {
try {
System.out.println(Thread.currentThread().getName() + ": message with id: " + message.toString()
+ " is already in work, blocking Thread until it is finished");
Result get = result.get(); //blocks thread
} catch (InterruptedException | ExecutionException ex) {
ex.printStackTrace();
// Do some failure management
}
}
テストクラス:
public class MessageReceiver {
private static String ID = "#########";
@EJB
private Worker worker;
public void receive(Message message) {
worker.work(message);
}
@PostConstruct
void init() {
receive(new Message(ID, "message 1"));
receive(new Message(ID, "message 2"));
receive(new Message(ID, "message 3"));
...
}