1

Mnesia に大きなテーブルがあり、さまざまな理由で (ここでは重要ではありません。選択をリモートで実行していて、サードパーティのライブラリを使用して結果をネットワーク経由で送信する必要があると言います)、1 つの選択ですべての行を選択できません。 . 一度に特定の量の列のみを取得するために、選択を既に分割しています。

たとえば、これは特定の列のみを取得するための選択の例です。

mnesia:dirty_select([table,[{{table,'$1','_','$3','$4','_','_','_'},[],['$$']}]]).

その選択を異なる列セットで 2 回実行し、結果を結合します。しかし、列の 1 つが大きすぎて 1 回の選択で取得できないことが判明しました。したがって、1 つの大きな選択を 2 つの選択に分割し、それぞれがその列の行の半分だけを取得したいと思います。たとえば、1 行おきに取得する簡単な方法はありますか? 奇数行のみを選択してから、偶数行のみを選択するようなものですか? それとも、行の前半を取得してから後半を取得する方法でしょうか?

列の 1 つからすべての行を選択し、それをインデックスとして使用して特定の行を取得しようとしました。これは機能しますが、選択を構築してから実行するにはかなりの時間がかかります。

編集:選択がリモートで実行されているという事実を十分に強調しなかったことを申し訳ありません。レコードの反復処理やファイルへの直接アクセスについては知っていますが、ここでの課題は、問題のレコードを比較的少量のコマンドで取得する必要があり、それらのコマンドをリモートで実行できる必要があることです。

例えば:

  1. 最初の列のみを選択します (単純な単一mnesia:dirty_selectコマンド)。
  2. 結果が (ネットワーク経由で) 取得されたら、それを 2 つのセットに分割し、特定のレコードを取得するための選択を構築するためのキーとして使用します (各選択には、取得するキーの長いリストが含まれますが、それらは単純な Erlang 用語であるため問題ありません。ネットワーク経由で送信)
  3. 2 で作成した 2 つのキーのセットを使用して、2 つのステップですべての行を取得します。

それは機能しますが、双方向に大量のデータを送信するため、簡単ではなく最適ではありません。選択が各列と行に含まれる特定のデータを考慮して構築されていない限り、簡単な解決策はないかもしれません (たとえば、文字「A」から「M」で始まる最初の列の名前を持つすべての行に一致します)。標準の Mnesia コマンドを使用して何が可能かはわかりませんが、より簡単な解決策を望んでいました.

4

2 に答える 2

0

自動インクリメント整数であるキーをテーブルに配置すると、QLC で大まかにこれを行うことができます。

Evens = qlc:q([Rec || Rec <- Table, (Rec#table.int_key rem 2) =:= 0]).
Odds  = qlc:q([Rec || Rec <- Table, (Rec#table.int_key rem 2) =:= 1]).

そのようなキーがない場合、またはそのようなキーが必要な場合の代替手段は、qlc:fold/3テーブルに対して使用して、1 秒おきのレコードを除外することです。Mnesia は、メモリ上の理由で必要な場合、データを反復処理するために一時ファイルを使用する必要があります。

CollectOddsFun = fun(Rec, {N, List}) ->
                     NewList = case N rem 2 of
                       0 -> [Rec|List];
                       1 -> List
                     end,
                     {N+1, NewList}
                  end,

qlc:fold(CollectOddsFun, {0, []}, Table).
于 2013-05-12T08:08:55.980 に答える
0

gen_select という名前の gen_server に基づいて動作を作成しました。これを使用して、 module 属性を使用してコールバック モジュールを記述します-behaviour(gen_select)。init/1 コールバックで、ets または dets ファイルを開き、一致仕様と制限を定義します。handle_record/2このプロセスは、ファイルの最後まで、レコードごとにコールバックを呼び出してテーブルをチャンクします。これは、私が行ってきた「ビッグデータ」の種類の作業にとって便利なパラダイムであることがわかりました。必要に応じて、mnesia テーブルの基になる ets テーブルで使用するか、mnesia:select/4 を使用するように変更できます。

%%% gen_select.erl
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% @doc This module implements a behaviour pattern where a potentially
%%%     large number of records are read from an {@link //stdlib/ets. ets}
%%%     or {@link //stdlib/dets. dets} table.  This is used in an application
%%%     to have supervised workers mapping over all the records in a table.
%%%     The user will call `gen_select:start_link/3', from a supervisor, to
%%%     create a process which will iterate over the selected records of a
%%%     table.  The `init/1' callback should open the table to
%%%     be read and construct a match specification to be used to select 
%%%     records from the table.  It should return a tuple of the form:
%%%     ```
%%%     {ok, TableType, Table, MatchSpec, Limit, State} | {stop, Reason} | ignore
%%%         TableType :: ets | dets
%%%         Table :: ets:tid() | atom()  % when Type=ets
%%%         Table :: dets:tab_name()     % when Type=dets
%%%         MatchSpec :: match_spec()    % see ets:select/2
%%%         Limit :: integer()           % see ets:select/3
%%%         State :: term()
%%%         Reason :: term()
%%%     '''
%%% After initialization {@link //stdlib/ets:select/3. ets:select/3}
%%% or {@link //stdlib/dets:select/3. dets:select/3} will be called
%%% using the `match_spec()' and `Limit' returned by `init/`'.  The
%%% callback function `handle_record/2' will then be called for each
%%% record returned then `select/1' will be called to get more records.
%%% This is repeated until the end of the table is reached when the
%%% callback `terminate/2' is called with `Reason=eof'..
%%% 
-module(gen_select).
-author('vance@wavenet.lk').

%% export the gen_select API
-export([start_link/3]).

%% export the callbacks needed for a system process
-export([system_continue/3, system_terminate/4, system_code_change/4]).
-export([format_status/2]).

%% exports used internally
-export([init_it/6]).

%% define the callback exports of a module behaving as gen_select
-type state() :: term().
-callback init(Args :: term()) ->
    {ok, TableType :: ets | dets, Table :: ets:tid() | atom() | dets:tab_name(),
        MatchSpec :: ets:match_spec(), Limit :: non_neg_integer(), State :: state()}
        | {stop, Reason :: term()} | ignore.
-callback handle_record(Record :: tuple(), State :: state()) ->
    {next_record, NewState :: state()}
        | {stop, Reason :: term(), NewState :: state()}.
-callback terminate(Reason :: eof | term(), State :: state()) ->
    any().

-import(error_logger, [format/2]).

%%----------------------------------------------------------------------
%%  gen_select API
%%----------------------------------------------------------------------

-spec start_link(Mod :: atom(), Args :: term(),
        Options :: gen:options()) -> gen:start_ret().
%% @doc Creates a {@module} process as part of a supervision tree.
%% 
start_link(Mod, Args, Options) ->
    gen:start(?MODULE, link, Mod, Args, Options).

%%----------------------------------------------------------------------
%%  internal exports
%%----------------------------------------------------------------------

-spec init_it(Starter :: pid(), LinkP :: gen:linkage(), Pid :: pid(),
        CallBackMod :: atom(), Args :: term(), Options :: gen:options()) ->
    no_return().
%% @doc Called by {@link //stdlib/gen:start/5. gen:start/5} to initialize
%%  the process.
%%  Copied from //stdlib/gen_server:init_it/6.
%% @hidden
init_it(Starter, Parent, Pid, CallBackMod, Args, Options) ->
    Debug = debug_options(Pid, Options),
    case catch CallBackMod:init(Args) of
        {ok, TableMod, Table, MatchSpec, Limit, State} ->
            proc_lib:init_ack(Starter, {ok, self()}),
            case catch ets:select(Table, MatchSpec, Limit) of
                {Matches, Cont} when is_list(Matches) ->
                    loop1(Parent, CallBackMod, Debug, State,
                            TableMod, Cont, Matches);
                '$end_of_table' ->
                    proc_lib:init_ack(Starter, {error, eof}),
                    exit(eof);
                {error, Reason} ->
                    proc_lib:init_ack(Starter, {error, Reason}),
                    exit(Reason);
                {'EXIT', Reason} ->
                    proc_lib:init_ack(Starter, {error, Reason}),
                    exit(Reason)
            end;
        {stop, Reason} ->
            proc_lib:init_ack(Starter, {error, Reason}),
            exit(Reason);
        ignore ->
            proc_lib:init_ack(Starter, ignore),
            exit(normal);
        {'EXIT', Reason} ->
            proc_lib:init_ack(Starter, {error, Reason}),
            exit(Reason);
        Else ->
            Error = {bad_return_value, Else},
            proc_lib:init_ack(Starter, {error, Error}),
            exit(Error)
    end.

%%----------------------------------------------------------------------
%%  system process callbacks
%%----------------------------------------------------------------------

-type misc() :: [CallBackMod :: atom() | [State :: state()
        | [TableMod :: atom() | [Cont :: term()
        | [Matches :: [tuple()] | []]]]]].

-spec system_continue(Parent :: pid(), Debug :: [gen:dbg_opt()],
        Misc :: misc()) -> no_return().
%% @doc Called by {@link //sys:handle_system_msg/6} to continue.
%% @private
system_continue(Parent, Debug, [CallBackMod, State,
        TableMod, Cont, Matches]) ->
    loop1(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches).

-spec system_terminate(Reason :: term(), Parent :: pid(),
        Debug :: [gen:dbg_opt()], Misc :: misc()) -> no_return().
%% @doc Called by {@link //sys:handle_system_msg/6} to terminate.
%% @private
system_terminate(Reason, _Parent, Debug, [CallBackMod, State,
        _TableMod, _Cont, _Matches]) ->
    terminate(Reason, CallBackMod, Debug, State).

-spec system_code_change(Misc :: misc(), Module :: atom(),
        OldVsn :: undefined | term(), Extra :: term()) ->
    {ok, NewMisc :: misc()}.
%% @doc Called by {@link //sys:handle_system_msg/6} to update `Misc'.
%% @private
system_code_change([CallBackMod, State, TableMod, Cont, Matches],
        _Module, OldVsn, Extra) ->
    case catch CallBackMod:code_change(OldVsn, State, Extra) of
        {ok, NewState} ->
            {ok, [CallBackMod, NewState, TableMod, Cont, Matches]};
        Other ->
            Other
    end.

-type pdict() :: [{Key :: term(), Value :: term()}].
-type status_data() :: [PDict :: pdict() | [SysState :: term()
        | [Parent :: pid() | [Debug :: [gen:dbg_opt()] | [Misc :: misc() | []]]]]].
-spec format_status(Opt :: normal | terminate, StatusData :: status_data()) ->
    [tuple()].
%% @doc Called by {@link //sys:get_status/1} to print state.
%% @private
format_status(Opt, [PDict, SysState, Parent, Debug, 
        [CallBackMod, State, _TableMod, _Cont, _Matches]]) ->
    Header = gen:format_status_header("Status for table reader", self()),
    Log = sys:get_debug(log, Debug, []),
    DefaultStatus = [{data, [{"State", State}]}],
    Specfic = case erlang:function_exported(CallBackMod, format_status, 2) of
        true ->
            case catch CallBackMod:format_status(Opt, [PDict, State]) of
                {'EXIT', _} ->
                    DefaultStatus;
                StatusList when is_list(StatusList) ->
                    StatusList;
                Else ->
                    [Else]
            end;
        _ ->
            DefaultStatus
    end,
    [{header, Header},
            {data, [{"Status", SysState},
                    {"Parent", Parent},
                    {"Logged events", Log}]}
            | Specfic].

%%----------------------------------------------------------------------
%%  internal functions
%%----------------------------------------------------------------------

-spec loop1(Parent :: pid(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
        State :: state(), TableMod :: atom(),
        Cont :: term(), Matches :: [tuple()]) -> no_return().
%% @doc Main loop.
%%  Copied from //stdlib/gen_server:loop1/6.
%% @hidden
loop1(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches) ->
    receive
        {system, From, Req} ->
            sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
                    [CallBackMod, State, TableMod, Cont, Matches]);
        {'EXIT', Parent, Reason} ->
            terminate(Reason, CallBackMod, Debug, State);
        Msg ->
            sys:handle_debug(Debug, fun print_event/3, self(), {in, Msg})
    after 0 ->
        loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches)
    end.

-spec loop2(Parent :: pid(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
        State :: state(), TableMod :: atom(), Cont :: term(),
        Matches :: [tuple()]) -> no_return().
%% @doc Run the `select/1' function.
%% @hidden
loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, [H | T]) ->
    case catch CallBackMod:handle_record(H, State) of
        {next_record, NewState} ->
            loop1(Parent, CallBackMod, Debug, NewState, TableMod, Cont, T);
        {stop, Reason, NewState} ->
            terminate(Reason, CallBackMod, Debug, NewState);
        {'EXIT', Reason} ->
            terminate(Reason, CallBackMod, Debug, State)
    end;
loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, []) ->
    case catch TableMod:select(Cont) of
        {Matches, NewCont} when is_list(Matches) ->
            sys:handle_debug(Debug, fun print_event/3, self(), {read, Matches}),
            loop1(Parent, CallBackMod, Debug, State, TableMod, NewCont, Matches);
        '$end_of_table' ->
            terminate(eof, CallBackMod, Debug, State);
        {error, Reason} ->
            terminate(Reason, CallBackMod, Debug, State);
        {'EXIT', Reason} ->
            terminate(Reason, CallBackMod, Debug, State)
    end.

-spec terminate(Reason :: term(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
        State :: state()) -> no_return().
%% @doc Terminate the {@module} process.
%%  Copied from //stdlib/gen_server:terminate/6.
%% @hidden
terminate(Reason, CallBackMod, Debug, State) ->
    case catch CallBackMod:terminate(Reason, State) of
        {'EXIT', R} ->
            error_info(R, State, Debug),
            exit(R);
        _ ->
            case Reason of
                normal ->
                    exit(normal);
                shutdown ->
                    exit(shutdown);
                {shutdown, _} = Shutdown ->
                    exit(Shutdown);
                _ ->
                    FmtState = case erlang:function_exported(CallBackMod,
                            format_status, 2) of
                        true ->
                            case catch CallBackMod:format_status(terminate,
                                    [get(), State]) of
                                {'EXIT', _} ->
                                    State;
                                Else ->
                                    Else
                            end;
                        _ ->
                            State
                    end,
                    error_info(Reason, FmtState, Debug),
                    exit(Reason)
            end
    end.

-spec error_info(Reason :: term(), State :: state(),
        Debug :: [gen:dbg_opt()]) -> ok.
%% @doc Print error log message.
%%  Copied from //stdlib/gen_server:error_info/5.
%% @hidden
error_info(Reason, State, Debug) ->
    Reason1 = case Reason of
        {undef, [{M, F, A, L} | MFAs]} ->
            case code:is_loaded(M) of
                false ->
                    {'module could not be loaded', [{M, F, A, L} | MFAs]};
                _ ->
                    case erlang:function_exported(M, F, length(A)) of
                        true ->
                            Reason;
                        false ->
                            {'function not exported', [{M, F, A, L} | MFAs]}
                    end
            end;
        _ ->
            Reason
   end,
    format("** Table reader ~p terminating \n"
            "** When Server state == ~p~n"
            "** Reason for termination == ~n** ~p~n",
            [self(), State, Reason1]),
    sys:print_log(Debug),
    ok.

%% Copied from //stdlib/gen_server:opt/2
opt(Op, [{Op, Value} | _]) ->
    {ok, Value};
opt(Op, [_ | Options]) ->
    opt(Op, Options);
opt(_, []) ->
    false.

%% Copied from //stdlib/gen_server:debug_options/2
debug_options(Name, Opts) ->
    case opt(debug, Opts) of
        {ok, Options} ->
            dbg_options(Name, Options);
        _ ->
            dbg_options(Name, [])
    end.

%% Copied from //stdlib/gen_server:dbg_options/2
dbg_options(Name, []) ->
    Opts = case init:get_argument(generic_debug) of
        error ->
            [];
        _ ->
            [log, statistics]
    end,
    dbg_opts(Name, Opts);
dbg_options(Name, Opts) ->
    dbg_opts(Name, Opts).

%% Copied from //stdlib/gen_server:dbg_opts/2
dbg_opts(Name, Opts) ->
    case catch sys:debug_options(Opts) of
        {'EXIT',_} ->
            format("~p: ignoring erroneous debug options - ~p~n",
                    [Name, Opts]),
            [];
        Dbg ->
            Dbg
    end.

-spec print_event(IoDevice :: io:device(), Event :: term(), Pid :: pid()) -> ok.
%% @doc Called by {@link //sys:handle_debug/4} to print trace events.
print_event(Dev, {in, Msg}, Pid) ->
    io:format(Dev, "*DBG* ~p got ~p~n", [Pid, Msg]);
print_event(Dev, {read, Matches}, Pid) ->
    io:format(Dev, "*DBG* ~p read ~b records~n", [Pid, length(Matches)]).
于 2013-05-13T11:01:16.030 に答える