0

Java の JeroMQ で IPC エンドポイントを使用して、私の別のアプリケーションと通信する tomcat7 アプリケーションがあります。クライアントサーバースキームがあり、クライアントはサーバーからの応答をしばらく待ちます。応答を受信しない場合、最初は再試行せずに失敗します。

コードは以下です

@Override
public List<Result> call() throws Exception {
    final List<Result> results = new LinkedList<>();
    try {
        for (DTO dto : messages) {
            Message m = MessageHelper.MessageMapper(dto);

            Thread.sleep(dto.getDelayBeforeSend());
            final Result mtresult = send(dto);
            results.add(result);
        }
    } catch (RuntimeException e) {
        LOGGER.error("Flow => Uncaught Exception: {}", e.getMessage());
        LOGGER.debug("Flow => Uncaught Exception: ", e);
        Thread t = Thread.currentThread();
        t.getUncaughtExceptionHandler().uncaughtException(t, e);
    }
    return results;
}

private Result send(Message m) {
    ZMQ.Socket client = MQSocketFactory.getMQSocket(serverEndpoint).createRequester();
    try {
        final byte[] DTO = Helper.serializeMessage(m);
        int retriesLeft = 1;
        Result result = new Result(MessageConstants.MESSAGE_FAIL);

        while (retriesLeft > 0 && !Thread.currentThread().isInterrupted()) {

            client.send(DTO, 0);
            int expect_reply = 1;

            while (expect_reply > 0) {

                ZMQ.PollItem items[] = { new ZMQ.PollItem(client, Poller.POLLIN) };
                int rc = ZMQ.poll(items, 3000);
                if (rc == -1) break; // Interrupted

                if (items[0].isReadable()) {
                    final byte[] reply = client.recv(0);
                    if (reply == null) break;
                    result = new Result(new String(reply));
                    if (result.isSuccessful()) {
                        LOGGER.trace("Server replied OK. Result: [{}]", result);
                        retriesLeft = 0;
                        expect_reply = 0;
                    } else LOGGER.error("Malformed reply from server: [{}]", result);

                } else if (--retriesLeft == 0) {
                    LOGGER.error("Server:[{}] seems to be offline, abandoning sending message [{}]!", serverEndpoint, m);
                    break;
                } else {
                    LOGGER.warn("No response from server, retrying...");
                    client = MQSocketFactory.getMQSocket(serverEndpoint).resetRequester(client);
                    client.send(DTO, 0);
                }
            }
        }
        return result;
    } finally {
        MQSocketFactory.getMQSocket(serverEndpoint).destroyRequester(client);

    }
}

MQSocketFactory クラスは次のようになります。

public final class MQSocketFactory {

private static final Map<String, MQSocket> store = new HashMap<String, MQSocket>();

private static final Logger LOGGER = LoggerFactory.getLogger(MQSocketFactory.class);

public static MQSocket getMQSocket(String endpointName) {
    synchronized (store) {
        MQSocket result = store.get(endpointName);
        if (result == null) {
            result = new MQSocket(endpointName);
            store.put(endpointName, result);
        }
        return result;
    }
}

public static final class MQSocket {

    private final String endpoint;
    private final ZMQ.Context ctx;

    private MQSocket(String endpointName) {
        this.endpoint = endpointName;
        this.ctx = ZMQ.context(1);
    }

    public ZMQ.Socket createRequester() {
        ZMQ.Socket client = null;
        try {
            client = ctx.socket(ZMQ.REQ);
            assert (client != null);
            client.connect(endpoint);
        } catch (Exception e) {
            LOGGER.error("Error: {}", e.getMessage());
            LOGGER.error("Error: {}", e);
        }
        return client;
    }

    public ZMQ.Socket resetRequester(ZMQ.Socket socket) {
        destroyRequester(socket);
        return createRequester();
    }

    public void destroyRequester(ZMQ.Socket socket) {
        if (socket != null) {
            try {
                socket.close();
            } catch (Exception e) {
                LOGGER.error("Error: {}", e.getMessage());
                LOGGER.debug("Error: {}", e);
            }
        }
    }

    public ZMQ.Context getContext() {
        return ctx;
    }

    // Responder Unit
    private ZMQ.Socket responder;

    public ZMQ.Socket createResponder() {
        if (responder == null) {
            this.responder = ctx.socket(ZMQ.REP);
            responder.bind(endpoint);
        }
        return responder;
    }

    public ZMQ.Socket resetResponder() {
        destroyResponder();
        return createResponder();
    }

    public void destroyResponder() {
        try {
            responder.close();
        } catch (Exception e) {
            LOGGER.error("Error: {}", e.getMessage());
            LOGGER.debug("Error: {}", e);
        }
    }

}

}

IOExcpetion Too Many file Open に関するこの特定の問題を回避するために、リクエストが完了した後にすべてのソケットが閉じられるように、これを具体的に行いました。ただし、ごくまれにこの問題が発生し、その理由がわかりません。アプリケーションは、ほぼ同じ負荷で何日も動作し、すべてが正常に機能している可能性がありますが、ある時点で例外がスローされ始め、その理由はわかりません。

また、Tomcat7 で ulimit を増やす方法はありますか? 現在1024です。

4

0 に答える 0