gen_select という名前の gen_server に基づいて動作を作成しました。これを使用して、 module 属性を使用してコールバック モジュールを記述します-behaviour(gen_select)
。init/1 コールバックで、ets または dets ファイルを開き、一致仕様と制限を定義します。handle_record/2
このプロセスは、ファイルの最後まで、レコードごとにコールバックを呼び出してテーブルをチャンクします。これは、私が行ってきた「ビッグデータ」の種類の作業にとって便利なパラダイムであることがわかりました。必要に応じて、mnesia テーブルの基になる ets テーブルで使用するか、mnesia:select/4 を使用するように変更できます。
%%% gen_select.erl
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% @doc This module implements a behaviour pattern where a potentially
%%% large number of records are read from an {@link //stdlib/ets. ets}
%%% or {@link //stdlib/dets. dets} table. This is used in an application
%%% to have supervised workers mapping over all the records in a table.
%%% The user will call `gen_select:start_link/3', from a supervisor, to
%%% create a process which will iterate over the selected records of a
%%% table. The `init/1' callback should open the table to
%%% be read and construct a match specification to be used to select
%%% records from the table. It should return a tuple of the form:
%%% ```
%%% {ok, TableType, Table, MatchSpec, Limit, State} | {stop, Reason} | ignore
%%% TableType :: ets | dets
%%% Table :: ets:tid() | atom() % when Type=ets
%%% Table :: dets:tab_name() % when Type=dets
%%% MatchSpec :: match_spec() % see ets:select/2
%%% Limit :: integer() % see ets:select/3
%%% State :: term()
%%% Reason :: term()
%%% '''
%%% After initialization {@link //stdlib/ets:select/3. ets:select/3}
%%% or {@link //stdlib/dets:select/3. dets:select/3} will be called
%%% using the `match_spec()' and `Limit' returned by `init/`'. The
%%% callback function `handle_record/2' will then be called for each
%%% record returned then `select/1' will be called to get more records.
%%% This is repeated until the end of the table is reached when the
%%% callback `terminate/2' is called with `Reason=eof'..
%%%
-module(gen_select).
-author('vance@wavenet.lk').
%% export the gen_select API
-export([start_link/3]).
%% export the callbacks needed for a system process
-export([system_continue/3, system_terminate/4, system_code_change/4]).
-export([format_status/2]).
%% exports used internally
-export([init_it/6]).
%% define the callback exports of a module behaving as gen_select
-type state() :: term().
-callback init(Args :: term()) ->
{ok, TableType :: ets | dets, Table :: ets:tid() | atom() | dets:tab_name(),
MatchSpec :: ets:match_spec(), Limit :: non_neg_integer(), State :: state()}
| {stop, Reason :: term()} | ignore.
-callback handle_record(Record :: tuple(), State :: state()) ->
{next_record, NewState :: state()}
| {stop, Reason :: term(), NewState :: state()}.
-callback terminate(Reason :: eof | term(), State :: state()) ->
any().
-import(error_logger, [format/2]).
%%----------------------------------------------------------------------
%% gen_select API
%%----------------------------------------------------------------------
-spec start_link(Mod :: atom(), Args :: term(),
Options :: gen:options()) -> gen:start_ret().
%% @doc Creates a {@module} process as part of a supervision tree.
%%
start_link(Mod, Args, Options) ->
gen:start(?MODULE, link, Mod, Args, Options).
%%----------------------------------------------------------------------
%% internal exports
%%----------------------------------------------------------------------
-spec init_it(Starter :: pid(), LinkP :: gen:linkage(), Pid :: pid(),
CallBackMod :: atom(), Args :: term(), Options :: gen:options()) ->
no_return().
%% @doc Called by {@link //stdlib/gen:start/5. gen:start/5} to initialize
%% the process.
%% Copied from //stdlib/gen_server:init_it/6.
%% @hidden
init_it(Starter, Parent, Pid, CallBackMod, Args, Options) ->
Debug = debug_options(Pid, Options),
case catch CallBackMod:init(Args) of
{ok, TableMod, Table, MatchSpec, Limit, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
case catch ets:select(Table, MatchSpec, Limit) of
{Matches, Cont} when is_list(Matches) ->
loop1(Parent, CallBackMod, Debug, State,
TableMod, Cont, Matches);
'$end_of_table' ->
proc_lib:init_ack(Starter, {error, eof}),
exit(eof);
{error, Reason} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
{'EXIT', Reason} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason)
end;
{stop, Reason} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
ignore ->
proc_lib:init_ack(Starter, ignore),
exit(normal);
{'EXIT', Reason} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
Else ->
Error = {bad_return_value, Else},
proc_lib:init_ack(Starter, {error, Error}),
exit(Error)
end.
%%----------------------------------------------------------------------
%% system process callbacks
%%----------------------------------------------------------------------
-type misc() :: [CallBackMod :: atom() | [State :: state()
| [TableMod :: atom() | [Cont :: term()
| [Matches :: [tuple()] | []]]]]].
-spec system_continue(Parent :: pid(), Debug :: [gen:dbg_opt()],
Misc :: misc()) -> no_return().
%% @doc Called by {@link //sys:handle_system_msg/6} to continue.
%% @private
system_continue(Parent, Debug, [CallBackMod, State,
TableMod, Cont, Matches]) ->
loop1(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches).
-spec system_terminate(Reason :: term(), Parent :: pid(),
Debug :: [gen:dbg_opt()], Misc :: misc()) -> no_return().
%% @doc Called by {@link //sys:handle_system_msg/6} to terminate.
%% @private
system_terminate(Reason, _Parent, Debug, [CallBackMod, State,
_TableMod, _Cont, _Matches]) ->
terminate(Reason, CallBackMod, Debug, State).
-spec system_code_change(Misc :: misc(), Module :: atom(),
OldVsn :: undefined | term(), Extra :: term()) ->
{ok, NewMisc :: misc()}.
%% @doc Called by {@link //sys:handle_system_msg/6} to update `Misc'.
%% @private
system_code_change([CallBackMod, State, TableMod, Cont, Matches],
_Module, OldVsn, Extra) ->
case catch CallBackMod:code_change(OldVsn, State, Extra) of
{ok, NewState} ->
{ok, [CallBackMod, NewState, TableMod, Cont, Matches]};
Other ->
Other
end.
-type pdict() :: [{Key :: term(), Value :: term()}].
-type status_data() :: [PDict :: pdict() | [SysState :: term()
| [Parent :: pid() | [Debug :: [gen:dbg_opt()] | [Misc :: misc() | []]]]]].
-spec format_status(Opt :: normal | terminate, StatusData :: status_data()) ->
[tuple()].
%% @doc Called by {@link //sys:get_status/1} to print state.
%% @private
format_status(Opt, [PDict, SysState, Parent, Debug,
[CallBackMod, State, _TableMod, _Cont, _Matches]]) ->
Header = gen:format_status_header("Status for table reader", self()),
Log = sys:get_debug(log, Debug, []),
DefaultStatus = [{data, [{"State", State}]}],
Specfic = case erlang:function_exported(CallBackMod, format_status, 2) of
true ->
case catch CallBackMod:format_status(Opt, [PDict, State]) of
{'EXIT', _} ->
DefaultStatus;
StatusList when is_list(StatusList) ->
StatusList;
Else ->
[Else]
end;
_ ->
DefaultStatus
end,
[{header, Header},
{data, [{"Status", SysState},
{"Parent", Parent},
{"Logged events", Log}]}
| Specfic].
%%----------------------------------------------------------------------
%% internal functions
%%----------------------------------------------------------------------
-spec loop1(Parent :: pid(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
State :: state(), TableMod :: atom(),
Cont :: term(), Matches :: [tuple()]) -> no_return().
%% @doc Main loop.
%% Copied from //stdlib/gen_server:loop1/6.
%% @hidden
loop1(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches) ->
receive
{system, From, Req} ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
[CallBackMod, State, TableMod, Cont, Matches]);
{'EXIT', Parent, Reason} ->
terminate(Reason, CallBackMod, Debug, State);
Msg ->
sys:handle_debug(Debug, fun print_event/3, self(), {in, Msg})
after 0 ->
loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches)
end.
-spec loop2(Parent :: pid(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
State :: state(), TableMod :: atom(), Cont :: term(),
Matches :: [tuple()]) -> no_return().
%% @doc Run the `select/1' function.
%% @hidden
loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, [H | T]) ->
case catch CallBackMod:handle_record(H, State) of
{next_record, NewState} ->
loop1(Parent, CallBackMod, Debug, NewState, TableMod, Cont, T);
{stop, Reason, NewState} ->
terminate(Reason, CallBackMod, Debug, NewState);
{'EXIT', Reason} ->
terminate(Reason, CallBackMod, Debug, State)
end;
loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, []) ->
case catch TableMod:select(Cont) of
{Matches, NewCont} when is_list(Matches) ->
sys:handle_debug(Debug, fun print_event/3, self(), {read, Matches}),
loop1(Parent, CallBackMod, Debug, State, TableMod, NewCont, Matches);
'$end_of_table' ->
terminate(eof, CallBackMod, Debug, State);
{error, Reason} ->
terminate(Reason, CallBackMod, Debug, State);
{'EXIT', Reason} ->
terminate(Reason, CallBackMod, Debug, State)
end.
-spec terminate(Reason :: term(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
State :: state()) -> no_return().
%% @doc Terminate the {@module} process.
%% Copied from //stdlib/gen_server:terminate/6.
%% @hidden
terminate(Reason, CallBackMod, Debug, State) ->
case catch CallBackMod:terminate(Reason, State) of
{'EXIT', R} ->
error_info(R, State, Debug),
exit(R);
_ ->
case Reason of
normal ->
exit(normal);
shutdown ->
exit(shutdown);
{shutdown, _} = Shutdown ->
exit(Shutdown);
_ ->
FmtState = case erlang:function_exported(CallBackMod,
format_status, 2) of
true ->
case catch CallBackMod:format_status(terminate,
[get(), State]) of
{'EXIT', _} ->
State;
Else ->
Else
end;
_ ->
State
end,
error_info(Reason, FmtState, Debug),
exit(Reason)
end
end.
-spec error_info(Reason :: term(), State :: state(),
Debug :: [gen:dbg_opt()]) -> ok.
%% @doc Print error log message.
%% Copied from //stdlib/gen_server:error_info/5.
%% @hidden
error_info(Reason, State, Debug) ->
Reason1 = case Reason of
{undef, [{M, F, A, L} | MFAs]} ->
case code:is_loaded(M) of
false ->
{'module could not be loaded', [{M, F, A, L} | MFAs]};
_ ->
case erlang:function_exported(M, F, length(A)) of
true ->
Reason;
false ->
{'function not exported', [{M, F, A, L} | MFAs]}
end
end;
_ ->
Reason
end,
format("** Table reader ~p terminating \n"
"** When Server state == ~p~n"
"** Reason for termination == ~n** ~p~n",
[self(), State, Reason1]),
sys:print_log(Debug),
ok.
%% Copied from //stdlib/gen_server:opt/2
opt(Op, [{Op, Value} | _]) ->
{ok, Value};
opt(Op, [_ | Options]) ->
opt(Op, Options);
opt(_, []) ->
false.
%% Copied from //stdlib/gen_server:debug_options/2
debug_options(Name, Opts) ->
case opt(debug, Opts) of
{ok, Options} ->
dbg_options(Name, Options);
_ ->
dbg_options(Name, [])
end.
%% Copied from //stdlib/gen_server:dbg_options/2
dbg_options(Name, []) ->
Opts = case init:get_argument(generic_debug) of
error ->
[];
_ ->
[log, statistics]
end,
dbg_opts(Name, Opts);
dbg_options(Name, Opts) ->
dbg_opts(Name, Opts).
%% Copied from //stdlib/gen_server:dbg_opts/2
dbg_opts(Name, Opts) ->
case catch sys:debug_options(Opts) of
{'EXIT',_} ->
format("~p: ignoring erroneous debug options - ~p~n",
[Name, Opts]),
[];
Dbg ->
Dbg
end.
-spec print_event(IoDevice :: io:device(), Event :: term(), Pid :: pid()) -> ok.
%% @doc Called by {@link //sys:handle_debug/4} to print trace events.
print_event(Dev, {in, Msg}, Pid) ->
io:format(Dev, "*DBG* ~p got ~p~n", [Pid, Msg]);
print_event(Dev, {read, Matches}, Pid) ->
io:format(Dev, "*DBG* ~p read ~b records~n", [Pid, length(Matches)]).