2

sys/socket.hヘッダーを使用して C++ で基本サーバーのコードを作成しました。サーバーを実行してリクエストに応答するメソッドを以下に示します。Server::keep_server_メンバー変数 (が true に設定されている) である限り、コードは連続ループで実行されます。JSON 要求を受信すると、マップから適切なハンドラーを検索し、それを呼び出して JSON 応答を生成し、send/ writes をクライアントに返します。

    /**
    * Run server, listen for client requests, send responses.
    */
    void Run() {
        int sock_fd, client_sock;
        struct sockaddr_in server{}, client{};

        if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
            throw std::runtime_error("Socket creation failed.");
        }

        int enable = 1;
        setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));

        server.sin_family = AF_INET;
        server.sin_addr.s_addr = INADDR_ANY;
        server.sin_port = htons(port_);

        // Bind socket.
        if (bind(sock_fd, (struct sockaddr *) &server, sizeof(server)) < 0) {
            throw std::runtime_error("Socket bind failed.");
        }

        // Listen and accept incoming connections.
        listen(sock_fd, max_clients_);
        listening_ = true;
        int c = sizeof(struct sockaddr_in);

        // Char buffer to which clients requests will be written.
        char client_req[2048];
        // JSON parser and serializer.
        JSONCPP_STRING parse_err;

        while ((client_sock = accept(sock_fd,
                                     (struct sockaddr *) &client,
                                     (socklen_t *) &c))
               && keep_server_)
        {
            // JSON objects to hold parsed request and serialized response.
            Json::Value json_req, json_resp;
            json_resp["SUCCESS"] = true;

            if (client_sock < 0) {
                throw std::runtime_error("Socket accept failed.");
            }

            bzero(client_req, 2048);
            long req_size = recv(client_sock, client_req, 2048, 0);

            if (req_size > 0) {
                std::string client_req_str(client_req);
                // Read clientReq as JSON first, write response to json_resp.
                if (reader_->parse(client_req_str.c_str(),
                                   client_req_str.c_str() +
                                   client_req_str.length(),
                                   &json_req, &parse_err))
                {
                    try {
                        // Get JSON response.
                        json_resp = ProcessRequest(json_req);
                    } catch (const std::exception &ex) {
                        // If json parsing failed.
                        json_resp["SUCCESS"] = false;
                        json_resp["ERRORS"] = std::string(ex.what());
                    }
                } else {
                    // If json parsing failed.
                    json_resp["SUCCESS"] = false;
                    json_resp["ERRORS"] = std::string(parse_err);
                }

                std::string resp = Json::writeString(writer_, json_resp);
                if(send(client_sock, strdup(resp.c_str()), MAX_SOCKET_WRITE, 0) == -1)
                    throw std::runtime_error("Socket write failed.");
            } else if (req_size == -1) {
                throw std::runtime_error("Socket receive failed.");
            }
        }

        listening_ = false;
        close(sock_fd);
    }

このコードは、数百サイクルの要求と応答に対して意図したとおりに機能し、その時点でsendor write(両方で試しましたが、それぞれ同じ問題が発生します) が返さ-1れ、プロセスが終了します。

長時間作業した後、ソケットの書き込みが突然失敗するのはなぜですか? これにどのように対処すればよいですか?

編集: 明確にするために、すべてのクライアント/サーバー アクティビティはローカルで実行されています (つまり、それらは私のマシン上の 2 つの異なるスレッドです)。

編集 2:errnoは 14 です。これは明らかに「不正なファイル記述子」です。

編集 3: 別のユーザーは、最小限の再現可能な例を提供することを推奨しました。エラーは、サーバーにリクエストを繰り返し送信することによって引き起こされるようです。

これを実現するために、まず、特定のクラスをテンプレート パラメーターとして受け取るテンプレート クラスであるサーバー クラスの完全なコードを提供します。std::mem_fnサーバーは、その構成において、特定のコマンド タイプにバインドするクラスからのいくつかの とともに、このクラスのインスタンスを渡されます。その後、サーバーは、提供されたテンプレート パラメーター クラスのインスタンスで関連するハンドラーを呼び出して、要求を処理します。

[SERVER.H]
#pragma once
#ifndef CHORD_FINAL_SERVER
#define CHORD_FINAL_SERVER

/// When reading from / writing to sockets, it's best to just read/write as many chars as possible.
/// We'll just define this for as a macro.
#define MAX_SOCKET_WRITE 999999

#include <json/json.h>
#include <map>
#include <string>
#include <functional>
#include <iostream>
#include <cstring>
#include <netinet/in.h>
#include <sys/socket.h>
#include <thread>
#include <unistd.h>
#include <cerrno>
#include <functional>

/**
 * A simple server which responds to requests using a pre-defined set of methods.
 *
 * @tparam RequestHandler Type of the method responding to requests.
 * @tparam RequestClass The class for which Request Handler is a method.
 */
template<class RequestHandler, class RequestClass>
class Server {
public:
    /**
    * Initialize server object. (Does not run server or listen for reqs.)
    *
    * @param port Port on which to run server.
    * @param commands A map of request strings (sent by clients) to function ptrs
    * to handle them.
    */
    Server(int port, int max_clients,
           const std::map<std::string, RequestHandler> *commands,
           RequestClass *request_class_inst) :
            port_(port),
            max_clients_(max_clients),
            commands_(*commands),
            request_class_inst_(request_class_inst),
            keep_server_(true),
            listening_(false),
            reader_((new Json::CharReaderBuilder)->newCharReader()) {}

    /**
    * Run server, listen for client requests, send responses.
    */
    void Run() {
        int sock_fd, client_sock;
        struct sockaddr_in server{}, client{};

        if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
            throw std::runtime_error("Socket creation failed.");
        }

        int enable = 1;
        setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));

        server.sin_family = AF_INET;
        server.sin_addr.s_addr = INADDR_ANY;
        server.sin_port = htons(port_);

        // Bind socket.
        if (bind(sock_fd, (struct sockaddr *) &server, sizeof(server)) < 0) {
            throw std::runtime_error("Socket bind failed.");
        }

        // Listen and accept incoming connections.
        if(listen(sock_fd, max_clients_) < 0)
            throw std::runtime_error("Listen failed");

        listening_ = true;
        int c = sizeof(struct sockaddr_in);

        // Char buffer to which clients requests will be written.
        char client_req[2048];
        // JSON parser and serializer.
        JSONCPP_STRING parse_err;

        while ((client_sock = accept(sock_fd,
                                     (struct sockaddr *) &client,
                                     (socklen_t *) &c))
               && keep_server_)
        {
            // JSON objects to hold parsed request and serialized response.
            Json::Value json_req, json_resp;
            json_resp["SUCCESS"] = true;

            if (client_sock < 0) {
                throw std::runtime_error("Socket accept failed.");
            }

            bzero(client_req, 2048);
            long req_size = recv(client_sock, client_req, 2048, 0);

            if (req_size > 0) {
                std::string client_req_str(client_req);
                // Read clientReq as JSON first, write response to json_resp.
                if (reader_->parse(client_req_str.c_str(),
                                   client_req_str.c_str() +
                                   client_req_str.length(),
                                   &json_req, &parse_err))
                {
                    try {
                        // Get JSON response.
                        json_resp = ProcessRequest(json_req);
                    } catch (const std::exception &ex) {
                        // If json parsing failed.
                        json_resp["SUCCESS"] = false;
                        json_resp["ERRORS"] = std::string(ex.what());
                    }
                } else {
                    // If json parsing failed.
                    json_resp["SUCCESS"] = false;
                    json_resp["ERRORS"] = std::string(parse_err);
                }

                std::string resp = Json::writeString(writer_, json_resp);
                errno = 0;
                if(send(client_sock, strdup(resp.c_str()), MAX_SOCKET_WRITE, 0) == -1) {
                    std::cout << "ERRNO " << errno << std::endl;
                    throw std::runtime_error("Socket write failed.");
                }
            } else if (req_size == -1) {
                throw std::runtime_error("Socket receive failed.");
            }

            if(close(client_sock) < 0)
                throw std::runtime_error("Socket close error");
        }

        listening_ = false;
        close(sock_fd);
    }

    /**
    * Run server as daemon.
    */
    void RunInBackground()
    {
        std::thread t([this] { Run(); });
        t.detach();
    }

    /**
    * Kill server, whether it runs in a detached thread or the main one.
    */
    void Kill()
    {
        keep_server_ = false;
    }

    /// Set to true when server has begun listening.
    bool listening_;

protected:
    /// Since the commands in commands are methods, this is the "this" we pass to them.
    RequestClass *request_class_inst_;

    /// Port on which server runs.
    const int port_;

    /// Maximum clients for which server will listen.
    const int max_clients_;

    /// When "true", server thread will continue running in background. When set
    /// to 0, server dies.
    bool keep_server_;

    /// A map of potential server commands (sent by clients) to RequestHandlers
    /// (func ptrs).
    std::map<std::string, RequestHandler> commands_;

    /**
    * Take a client's JSON request, generate an appropriate JSON response.
    *
    * @param request JSON object outlining request from a client to this server.
    * @return a JSON object to be transmitted as a response to the client's req,
    *          with a key "SUCCESS" set to either 1 or 0 to indicate whether
    *          request was valid and succesffully executed, along with any
    *          other relevant return info from the request.
    */
    Json::Value ProcessRequest(Json::Value request)
    {
        Json::Value response;
        std::string command = request["COMMAND"].asString();

        // If command is not valid, give a response with an error.
        if(commands_.find(command) == commands_.end()) {
            response["SUCCESS"] = false;
            response["ERRORS"] = "Invalid command.";
        }
            // Otherwise, run the relevant handler.
        else {
            RequestHandler handler = commands_.at(command);
            response = handler(*request_class_inst_, request);
        }

        return response;
    }

private:
    /// Reads JSON.
    const std::unique_ptr<Json::CharReader> reader_;
    /// Writes JSON.
    Json::StreamWriterBuilder writer_;
};

#endif

また、基本的なクライアント クラスを次のように定義します。

[CLIENT.H]
#pragma once
#ifndef CHORD_FINAL_CLIENT_H
#define CHORD_FINAL_CLIENT_H

#include <json/json.h>

class Client {
public:
    /**
     * Constructor, initializes JSON parser and serializer.
     */
    Client();

    /**
     * Send a request to a server at a given port-IP pairing.
     *
     * @param recipient_ip IP to send request to.
     * @param port Port to send request to.
     * @param request JSON object detailing request to server.
     * @return JSON response from server or throw an error.
     */
    Json::Value make_request(const char *recipient_ip, int port, const Json::Value &request);

    /**
     * Determine whether or not a node is alive.
     *
     * @param ip_addr IP address of node to ping.
     * @param port Port of node to ping
     * @return Value of whether or not a server is running on ip_addr:port.
     */
    static bool is_alive(const char *ip_addr, int port);

protected:
    /// Reads JSON.
    const std::unique_ptr<Json::CharReader> reader_;
    /// Writes JSON.
    Json::StreamWriterBuilder writer_;
};


#endif //CHORD_FINAL_CLIENT_H
[CLIENT.CPP]
#include "client.h"
#include <iostream>

#include <arpa/inet.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <thread>

Client::Client()
        : reader_((new Json::CharReaderBuilder)->newCharReader())
{}

Json::Value Client::make_request(const char *recipient_ip, int port,
                                 const Json::Value &request)
{
    int sock_fd;
    struct sockaddr_in destination{};

    if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        throw std::runtime_error("Socket creation failed");
    }
    destination.sin_addr.s_addr = inet_addr(recipient_ip);
    destination.sin_family = AF_INET;
    destination.sin_port = htons(port);

    if (connect(sock_fd, (struct sockaddr *) &destination, sizeof(destination)) < 0) {
        close(sock_fd);
        throw std::runtime_error("Host " + std::string(recipient_ip) +
                                 + ":" + std::to_string(port) + " down.");
    }

    std::string serialized_req = Json::writeString(writer_, request);
    if (send(sock_fd, serialized_req.c_str(), strlen(serialized_req.c_str()), 0) < 0) {
        close(sock_fd);
        throw std::runtime_error("Socket write failed.");
    }

    struct timeval tv;
    tv.tv_sec = 1;
    tv.tv_usec = 0;
    setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv);

    char resp_buff[2048];
    if (recv(sock_fd, resp_buff, 2048, 0) < 0) {
        throw std::runtime_error("Socket receive error.");
    }

    Json::Value json_resp;
    JSONCPP_STRING parse_err;
    std::string resp_str(resp_buff);
    if (reader_->parse(resp_str.c_str(), resp_str.c_str() + resp_str.length(),
                      &json_resp, &parse_err))
    {
        close(sock_fd);
        return json_resp;
    }

    throw std::runtime_error("Error parsing response.");
}

bool Client::is_alive(const char *ip_addr, int port)
{
    int sock_fd;
    struct sockaddr_in destination{};

    if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        throw std::runtime_error("Socket creation failed");
    }
    destination.sin_addr.s_addr = inet_addr(ip_addr);
    destination.sin_family = AF_INET;
    destination.sin_port = htons(port);

    if (connect(sock_fd, (struct sockaddr *) &destination, sizeof(destination)) < 0) {
        return false;
    }

    return true;
}

私の問題を再現するには、JSON リクエストを処理するように設計されたメソッドを使用して、基本的なサーバー タイプを定義するだけで済みます。

/// NOTE: This class exists exclusively for unit testing.
class RequestClass {
public:
    /**
     * Initialize class with value n to add sub from input values.
     *
     * @param n Value to add/sub from input values.
     */
    explicit RequestClass(int n) : n_(n) {}

    /// Value to add/sub from
    int n_;

    /**
     * Add n to value in JSON request.
     *
     * @param request JSON request with field "value".
     * @return JSON response containing modified field "value" = [original_value] + n.
     */
    [[nodiscard]] Json::Value add_n(const Json::Value &request) const
    {
        Json::Value resp;
        resp["SUCCESS"] = true;

        // If value is present in request, return value + 1, else return error.
        if (request.get("VALUE", NULL) != NULL) {
            resp["VALUE"] = request["VALUE"].asInt() + this->n_;
        } else {
            resp["SUCCESS"] = false;
            resp["ERRORS"] = "Invalid value.";
        }
        return resp;
    }

    /**
     * Sun n from value in JSON request.
     *
     * @param request JSON request with field "value".
     * @return JSON response containing modified field "value" = [original_value] - n.
     */
    [[nodiscard]] Json::Value sub_n(const Json::Value &request) const
    {
        Json::Value resp, value;
        resp["SUCCESS"] = true;

        // If value is present in request, return value + 1, else return error.
        if (request.get("VALUE", NULL) != NULL) {
            resp["VALUE"] = request["VALUE"].asInt() - this->n_;
        } else {
            resp["SUCCESS"] = false;
            resp["ERRORS"] = "Invalid value.";
        }
        return resp;
    }
};

次に、メソッドをコマンド文字列にマッピングするサーバーをインスタンス化します。大量のリクエストを送信すると、1 秒以内に、sendまたはwrite-1 が返され、プログラムが実行時エラーをスローします。

    Server<RequestClassMethod, RequestClass> *server_;
    auto *request_inst = new RequestClass(1);
    std::map<std::string, RequestClassMethod> commands {{"ADD_1", std::mem_fn(&RequestClass::add_n)},
                                                        {"SUB_1", std::mem_fn(&RequestClass::sub_n)}};
    server_ = new Server<RequestClassMethod, RequestClass>(5000, 50, &commands, request_inst);
    server_->RunInBackground();
    // Since the server is running in a new thread, spin until it completes setup.
    while(! server_->listening_) {}
    Client *request_maker_ = new Client;

    // Formulate request asking for value of 1 - 1.
    Json::Value sub_one_req;
    sub_one_req["COMMAND"] = "SUB_1";
    sub_one_req["VALUE"] = 1;

    // Send request, expect value of 0 and successful return code.
    Json::Value sub_one_resp = request_maker_->make_request("127.0.0.1", 5000, sub_one_req);
    while(true) {
        Json::Value sub_one_resp = request_maker_->make_request("127.0.0.1", 5000, sub_one_req);
    }

うまくいけば、これで再現するのに十分です。以前の質問が不適切であった場合は申し訳ありません。

EDIT 4:これは、プログラムごとの最大ファイル記述子に関連する問題である可能性があることに気付きました。ただし、私のプログラムは、失敗する前に正確に 2031 の要求応答サイクルを完了します。これは、最大ファイル記述子の上限と一致していないようです。

4

0 に答える 0