4

私がやろうとしているのは、gen_serverプロセスに新しいクライアントを受け入れさせ、すぐに新しい子を生成して次のクライアントを処理させることです。私が見ている問題は、ソケットが終了して結果的に終了すると、リスニングソケットも閉じて、参照しなくなっても理由が​​わからないことです。

私が間違っていることについて何か考えはありますか?

gen_server:

-module(simple_tcp).
-behaviour(gen_server).

%% API
-export([start_link/1, stop/0, start/0, start/1]).

%% gen-server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-define(SERVER, ?MODULE).
-define(DEFAULT_PORT, 1055).

-record(state, {port, lsock}).

start_link({port, Port}) ->
    gen_server:start_link(?MODULE, [{port, Port}], []);

start_link({socket, Socket}) ->
    gen_server:start_link(?MODULE, [{socket, Socket}], []).

start({port, Port}) ->
    simple_tcp_sup:start_child({port, Port});

start({socket, Socket}) ->
    simple_tcp_sup:start_child({socket, Socket}).

start() ->
    start({port, ?DEFAULT_PORT}).

stop() ->
    gen_server:cast(?SERVER, stop).

% Callback functions
init([{port, Port}]) ->
    {ok, LSock} = gen_tcp:listen(Port, [{active, true},{reuseaddr, true}]),
    init([{socket, LSock}]);

init([{socket, Socket}]) ->
    io:fwrite("Starting server with socket: ~p~n", [self()]),
    {ok, Port} = inet:port(Socket),
    {ok, #state{port=Port, lsock=Socket}, 0}. 

handle_call(_Msg, _From, State) ->
    {noreply, State}.

handle_cast(stop, State) ->
    {stop, ok, State}.

handle_info({tcp, Socket, RawData}, State) ->
    gen_tcp:send(Socket, io_lib:fwrite("Received raw data: ~p~n", [RawData])),
    {noreply, State};

handle_info({tcp_error, _Socket, Reason}, State) ->
    io:fwrite("Error: ~p~n", [Reason]),
    {stop, normal, State};

handle_info(timeout, #state{lsock = LSock} = State) ->
    case gen_tcp:accept(LSock) of
        {ok, Sock} ->
            io:fwrite("Accepting connection...~p~n", [self()]),
            start({socket, LSock}),
            {noreply, #state{lsock=Sock}};

        {error, Reason} ->
            io:fwrite("Error: ~p, ~p~n", [Reason, self()]),
            {stop, normal, State}
    end;

handle_info({tcp_closed, _Port}, State) ->
    io:fwrite("Socket closed: ~p~n", [self()]),
    simple_tcp_sup:kill_child(self()),
    {stop, normal, State}.

terminate(_Reason, _State) ->
    io:fwrite("Shutting down server: ~p~n", [self()]),
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

スーパーバイザー:

-module(simple_tcp_sup).

-behaviour(supervisor).

-export([start_link/0,
         start_child/1
        ]). 

-export([init/1]).

-define(SERVER, ?MODULE).

start_link() ->
    supervisor:start_link({local, ?SERVER}, ?MODULE, []).

start_child({socket, Socket}) ->
    io:fwrite("Spawning child with socket...~n"),
    supervisor:start_child(?SERVER, [{socket, Socket}]);

start_child({port, Port}) ->
    io:fwrite("Spawning child with port...~n"),
    supervisor:start_child(?SERVER, [{port, Port}]).

init([]) ->
    Element = {simple_tcp, {simple_tcp, start_link, []},
               temporary, brutal_kill, worker, [simple_tcp]},
    Children = [Element],
    RestartStrategy = {simple_one_for_one, 0, 1}, 
    {ok, {RestartStrategy, Children}}.
4

2 に答える 2

6

3 つ目はとhandle_infoの役割を逆にします。これは子プロセスに渡され、それ自体の状態は変更されません。SockLSockSock

Stateところで:ゼロから再構築するのは悪いカルマです( )。後でさらに状態変数を追加する場合に備えて、常に現在のものから#state{lsock=Sock}新しいものを派生させる必要があります( )。実際、ポート番号を破棄しているため、これには現在バグがあります (良性のものではありますが)。StateStateState#state{lsock=Sock}

于 2011-09-03T05:42:45.320 に答える
2

まあ、ソケットのものは、gen_server と非同期で通信し、それと一緒にいる別のプロセスによって処理されるようにすることをお勧めしますlinked。これを行う方法を示すサンプル コード スニペットがあります。gen_server は TCP リスナーを開始して生成します。TCP リスナーは、リッスン ソケットの取得に成功した後、gen_server に内部状態の変更を通知します。コードを上から順に並べました。関連するすべての機能が表示されました。ソケット処理プロセスと、それらが gen_server と対話する方法に注目してください。

-define(PEER_CLIENT_TIMEOUT,タイマー:秒(20))。
-define(PORT_RANGE,{10245,10265})。
-define(DEBUG(X,Y),error_logger:info_msg(X,Y))を定義します。
-define(ERROR(L),error_logger:error_report(L))を定義します。
-define(SOCKET_OPTS(IP),[inet,binary,{バックログ,100},{パケット,0},
                            {reuseaddr,true},{active,true},
                            {ip,IP}])。

%%------------------------------------------------ ----
%% gen_server はここから始まります....

開始 (ピア名)->
    gen_server:start_link({local,?MODULE},?MODULE,PeerName,[])。

%%% -----------------------------------------------
%% Gen_server init/1 関数

init(ピア名)->
    process_flag(trap_exit,true),
    %% 以下のソケット チェーン全体を開始します。
    start_link_listener(),
    %% ソケット処理が開始されました。gen_server は非同期を待機できるようになりました
    %% メッセージ
    {わかった、[]}。

%%% ---- ソケット処理関数 ----------

%% 関数: start_link_listener/0
%% 目的: リスニングのチェーン全体を開始します
%% 接続を待機しています。実行された
%% gen_server プロセスによって直接、しかし
%% は、残りを行うために別のプロセスを生成します

start_link_listener()->
    Ip_address = get_myaddr(),  
    spawn_link(fun() -> listener(?SOCKET_OPTS(Ip_address)) end)。

%%% -----------------------------------------------   
%% 関数: get_myaddr/0
%% 目的: マシンのアクティブな IP アドレスを選択して、
%% リッスン

get_myaddr()->
    ?DEBUG("サーバー> ローカル IP アドレスを抽出しようとしています....",[]),
    {ok,名前} = inet:gethostname(),
    {ok,IP} = inet:getaddr(Name,inet),
    ?DEBUG("Server> Found Alive Local IP address: ~p.....~n",[IP]),
    IP。

%%% --------------------------------------------------- ---
%% 関数: listener/1、別のプロセスで実行
%% 目的: 指定されたソケット オプションを使用して、指定された ?PORT_RANGE を試行します。
%% ListenSocket を取得すると、gen_server! をキャストします。

リスナー (SocketOpts)->
    process_flag(trap_exit,true),
    ポート = リスト:seq(要素(1,?PORT_RANGE),要素(2,?PORT_RANGE)),
    ケース try_listening(SocketOpts,Ports) の
        {OK,ポート,LSocket}->              
                PP = proplists:get_value(ip,SocketOpts),
                ?MODULE:started_listener(Port,PP,LSocket),              
                accept_connection(LSocket);
        {エラー、失敗} -> {エラー、失敗、SocketOpts}
    終わり。

try_listening(_Opts,[])-> {エラー、失敗};
try_listening(Opts,[Port|Rest])->
    case gen_tcp:listen(Port,Opts) の
        {ok,Listen_Socket} -> {ok,Port,Listen_Socket};
        {error,_} -> try_listening(Opts,Rest)
    終わり。
%%% --------------------------------------------------- ----------
%% タプルから IP アドレスを変換するためのヘルパー関数
%% から文字列へ、またはその逆

is_integer(X)-> integer_to_list(X) の場合は str(X)。

Formalise_ipaddress({A,B,C,D})->
    str(A) ++ "." ++ str(B) ++ "." ++ str(C) ++ "." ++ str(D)。

unformalise_address(文字列)->
    [A,B,C,D] = 文字列:トークン(文字列,"."),
    {list_to_integer(A),list_to_integer(B),list_to_integer(C),list_to_integer(D)}.

%%% --------------------------------------------------- ---
%% 関数: get_source_connection/1
%% 目的: 相手側の IP とポートを取得する
%% 接続の終了

get_source_connection(ソケット)->
    inet:peername(Socket) を試してください
        {ok,{IP_Address, Port}} ->
            [{ipAddress,formalise_ipaddress(IP_Address)},{ポート,ポート}];
        _ -> failed_to_retrieve_address
    キャッチ
        _:_ -> failed_to_retrieve_address
    終わり。

%%% --------------------------------------------------- ------
%% 関数: accept_connection/1
%% 目的: 接続を待ち、
%% 別のスレッドを生成することによる ListenSocket
%% それを取り、あまりにも聞く。gen_server をキャストします
%% を接続ごとに表示し、その詳細を提供します。

accept_connection(ListenSocket)->    
    case gen_tcp:accept(ListenSocket,infinity) の
        {わかりました、ソケット}->
            %% 以下の ListenSocket を再利用してください.....
            spawn_link(fun() -> accept_connection(ListenSocket) end),            
            OtherEnd = get_source_connection(ソケット),
            ?MODULE:accepted_connection(OtherEnd),          
            ループ (ソケット、その他の端);
        {error,_} = 理由 ->
            ?ERROR(["リスナーは接続の受け入れに失敗しました",
                    {listener,self()},{理由,理由}])
    終わり。

%%% --------------------------------------------------- --------------------------
%% 関数: ループ/2
%% 目的: TCP 受信ループ、gen_server をキャストします
%% 何かを受け取るとすぐに。gen_server
%% は応答の生成を担当します
%% OtherEnd ::= [{ipAddress,StringIPAddress},{Port,Port}] または 'failed_to_retrieve_address'

loop(Socket,OtherEnd)->
    受け取る
        {tcp、ソケット、データ}->
            ?DEBUG("アクセプタ: ~p がバイナリ メッセージを受信しました: ~p~n",[self(),OtherEnd]),
            Reply = ?MODULE:incoming_binary_message(Data,OtherEnd),
            gen_tcp:send(ソケット、応答)、         
            gen_tcp:close(ソケット),
            終了(通常);
        {tcp_closed, ソケット} ->
            ?DEBUG("アクセプタ: ~p。他端によってソケットが閉じられました: ~p~n",[self(),OtherEnd]),
            ?MODULE:socket_closed(OtherEnd),
            終了(通常);
        Any -> ?DEBUG("Acceptor: ~p has received a message: ~p~n",[self(),Any])
    終わり。

%%% -----------------------------------------------
%% Gen_server 非同期 API

受け入れられた接続 (failed_to_retrieve_address)-> OK;
受け入れられた_接続([{ipAddress,StringIPAddress},{Port,Port}])->     
    gen_server:cast(?MODULE,{connected,StringIPAddress,Port})。

socket_closed(failed_to_retrieve_address)-> OK;
socket_closed([{ipAddress,StringIPAddress},{Port,Port}])->
    gen_server:cast(?MODULE,{socket_closed,StringIPAddress,Port})。

incoming_binary_message(Data,_OtherEnd)-> %% バイナリ応答を期待
    case analyse_protocol(Data) の
        間違っている -> term_to_binary("プロトコル違反!");
        Val -> gen_server:call(?MODULE,{request,Val},infinity)
    終わり。

%%% -------------------- ハンドルキャスト ------------------------- -----------------

handle_cast({listener_starts,_Port,_MyTupleIP,_LSocket} = オブジェクト,状態)->
    NewState = do_something_with_the_listen_report(Object),
    {noreply,NewState};
handle_cast({connected,_StringIPAddress,_Port} = オブジェクト,状態)->
    NewState = do_something_with_the_connection_report(オブジェクト),
    {noreply,NewState};
handle_cast({socket_closed,_StringIPAddress,_Port} = オブジェクト,状態)->
    NewState = do_something_with_the_closed_connection_report(オブジェクト),
    {noreply,NewState};
handle_cast(Any,State)->
    ?DEBUG("サーバー> 不明なメッセージが表示されました: ~p~n",[Any]),
    {または、州}。


%%%% ---------------------- コールの処理 --------------
handle_call({request,Val},_,State)->
    {NewState,Reply} = req(Val,State),
    {返信、返信、NewState};
handle_call(_,_,State)-> {reply,[],State}.

req(ヴァル、状態)->
    %% gen_server の状態を変更し、
    %% ビルド応答
    {NewState,Reply} = modify_state_and_get_reply(State,Val),
    {NewState,Reply}。

%%--------------------- 終了/2 --------------------

終了 (_Reason,_State)-> わかりました。  

%%----------------- code_change/3 ------------------

code_change(_,状態,_)-> {OK,状態}.

gen_server の非同期機能により、個別のリンクされたプロセスからソケットの詳細を処理できます。cast次に、これらのプロセスは、 gen_server の同時性をブロックすることなく、gen_server と通信します。

于 2011-09-03T06:17:32.080 に答える