Java の JeroMQ で IPC エンドポイントを使用して、私の別のアプリケーションと通信する tomcat7 アプリケーションがあります。クライアントサーバースキームがあり、クライアントはサーバーからの応答をしばらく待ちます。応答を受信しない場合、最初は再試行せずに失敗します。
コードは以下です
@Override
public List<Result> call() throws Exception {
final List<Result> results = new LinkedList<>();
try {
for (DTO dto : messages) {
Message m = MessageHelper.MessageMapper(dto);
Thread.sleep(dto.getDelayBeforeSend());
final Result mtresult = send(dto);
results.add(result);
}
} catch (RuntimeException e) {
LOGGER.error("Flow => Uncaught Exception: {}", e.getMessage());
LOGGER.debug("Flow => Uncaught Exception: ", e);
Thread t = Thread.currentThread();
t.getUncaughtExceptionHandler().uncaughtException(t, e);
}
return results;
}
private Result send(Message m) {
ZMQ.Socket client = MQSocketFactory.getMQSocket(serverEndpoint).createRequester();
try {
final byte[] DTO = Helper.serializeMessage(m);
int retriesLeft = 1;
Result result = new Result(MessageConstants.MESSAGE_FAIL);
while (retriesLeft > 0 && !Thread.currentThread().isInterrupted()) {
client.send(DTO, 0);
int expect_reply = 1;
while (expect_reply > 0) {
ZMQ.PollItem items[] = { new ZMQ.PollItem(client, Poller.POLLIN) };
int rc = ZMQ.poll(items, 3000);
if (rc == -1) break; // Interrupted
if (items[0].isReadable()) {
final byte[] reply = client.recv(0);
if (reply == null) break;
result = new Result(new String(reply));
if (result.isSuccessful()) {
LOGGER.trace("Server replied OK. Result: [{}]", result);
retriesLeft = 0;
expect_reply = 0;
} else LOGGER.error("Malformed reply from server: [{}]", result);
} else if (--retriesLeft == 0) {
LOGGER.error("Server:[{}] seems to be offline, abandoning sending message [{}]!", serverEndpoint, m);
break;
} else {
LOGGER.warn("No response from server, retrying...");
client = MQSocketFactory.getMQSocket(serverEndpoint).resetRequester(client);
client.send(DTO, 0);
}
}
}
return result;
} finally {
MQSocketFactory.getMQSocket(serverEndpoint).destroyRequester(client);
}
}
MQSocketFactory クラスは次のようになります。
public final class MQSocketFactory {
private static final Map<String, MQSocket> store = new HashMap<String, MQSocket>();
private static final Logger LOGGER = LoggerFactory.getLogger(MQSocketFactory.class);
public static MQSocket getMQSocket(String endpointName) {
synchronized (store) {
MQSocket result = store.get(endpointName);
if (result == null) {
result = new MQSocket(endpointName);
store.put(endpointName, result);
}
return result;
}
}
public static final class MQSocket {
private final String endpoint;
private final ZMQ.Context ctx;
private MQSocket(String endpointName) {
this.endpoint = endpointName;
this.ctx = ZMQ.context(1);
}
public ZMQ.Socket createRequester() {
ZMQ.Socket client = null;
try {
client = ctx.socket(ZMQ.REQ);
assert (client != null);
client.connect(endpoint);
} catch (Exception e) {
LOGGER.error("Error: {}", e.getMessage());
LOGGER.error("Error: {}", e);
}
return client;
}
public ZMQ.Socket resetRequester(ZMQ.Socket socket) {
destroyRequester(socket);
return createRequester();
}
public void destroyRequester(ZMQ.Socket socket) {
if (socket != null) {
try {
socket.close();
} catch (Exception e) {
LOGGER.error("Error: {}", e.getMessage());
LOGGER.debug("Error: {}", e);
}
}
}
public ZMQ.Context getContext() {
return ctx;
}
// Responder Unit
private ZMQ.Socket responder;
public ZMQ.Socket createResponder() {
if (responder == null) {
this.responder = ctx.socket(ZMQ.REP);
responder.bind(endpoint);
}
return responder;
}
public ZMQ.Socket resetResponder() {
destroyResponder();
return createResponder();
}
public void destroyResponder() {
try {
responder.close();
} catch (Exception e) {
LOGGER.error("Error: {}", e.getMessage());
LOGGER.debug("Error: {}", e);
}
}
}
}
IOExcpetion Too Many file Open に関するこの特定の問題を回避するために、リクエストが完了した後にすべてのソケットが閉じられるように、これを具体的に行いました。ただし、ごくまれにこの問題が発生し、その理由がわかりません。アプリケーションは、ほぼ同じ負荷で何日も動作し、すべてが正常に機能している可能性がありますが、ある時点で例外がスローされ始め、その理由はわかりません。
また、Tomcat7 で ulimit を増やす方法はありますか? 現在1024です。