エンド マシンが多数あり、それぞれに時折メッセージが送信されるか、少数のエンド マシンがあり、それぞれに頻繁にメッセージが送信されるかは、少し検討する必要があります。
多数のエンド マシンがある場合、エンド マシンごとに文字通り 1 つのスレッドを使用するのは、実際にそれらすべてのマシンにメッセージを常にストリーミングしない限り、少し過剰に聞こえます。特定の境界間でのみ成長するスレッドのプールを持つことをお勧めします。これを行うには、ThreadPoolExecutor を使用できます。メッセージを投稿する必要がある場合は、実際にメッセージを送信するエグゼキューターに実行可能ファイルを送信します。
Executor msgExec = new ThreadPoolExecutor(...);
public void sendMessage(final String machineId, byte[] message) {
msgExec.execute(new Runnable() {
public void run() {
sendMessageNow(machineId, message);
}
});
}
private void sendMessageNow(String machineId, byte[] message) {
// open connection to machine and send message, thinking
// about the case of two simultaneous messages to a machine,
// and whether you want to cache connections.
}
エンド マシンが数台しかない場合は、マシンごとに BlockingQueue を配置し、ブロッキング キューごとに次のメッセージを待機するスレッドを配置できます。この場合、パターンは次のようになります (テストされていない頭から離れた日曜日の朝のコードに注意してください)。
ConcurrentHashMap<String,BockingQueue> queuePerMachine;
public void sendMessage(String machineId, byte[] message) {
BockingQueue<Message> q = queuePerMachine.get(machineId);
if (q == null) {
q = new BockingQueue<Message>();
BockingQueue<Message> prev = queuePerMachine.putIfAbsent(machineId, q);
if (prev != null) {
q = prev;
} else {
(new QueueProessor(q)).start();
}
}
q.put(new Message(message));
}
private class QueueProessor extends Thread {
private final BockingQueue<Message> q;
QueueProessor(BockingQueue<Message> q) {
this.q = q;
}
public void run() {
Socket s = null;
for (;;) {
boolean needTimeOut = (s != null);
Message m = needTimeOut ?
q.poll(60000, TimeUnit.MILLISECOND) :
q.take();
if (m == null) {
if (s != null)
// close s and null
} else {
if (s == null) {
// open s
}
// send message down s
}
}
// add appropriate error handling and finally
}
}
この場合、そのマシンへのメッセージが 60 秒以内に到着しない場合、接続を閉じます。
代わりに JMS を使用する必要がありますか? さて、これがあなたにとって複雑に聞こえるかどうかを検討する必要があります. 私の個人的な感覚では、特別なフレームワークを保証するほど複雑な作業ではありません。しかし、私は意見が異なると確信しています。
PS実際には、これを見て、おそらくキューをスレッドオブジェクト内に配置し、マシンID->スレッドオブジェクトをマップするだけです。とにかく、あなたはアイデアを得る。