1

整数の大きなコレクションがあるとします (50,000,000 など)。

関数にパラメーターとして渡された値を超えないコレクション内の最大の整数を返す関数を書きたいと思います。たとえば、値が次の場合:

 Values = [ 10, 20, 30, 40, 50, 60]

20をfind(Values, 25)返す必要があります。

関数は 1 秒間に何度も呼び出され、コレクションは大きくなります。ブルートフォース検索のパフォーマンスが遅すぎると仮定すると、それを行う効率的な方法は何でしょうか? 整数はめったに変更されないため、最速のアクセスを提供するデータ構造に格納できます。

gb_trees を見てきましたが、「挿入ポイント」を取得してから前のエントリを取得できるとは思いません。

独自のツリー構造を構築するか、ソートされた配列をバイナリ チョッピングすることで、これをゼロから実行できることはわかっていますが、見落としている組み込みの方法はありますか?

4

5 に答える 5

4

並べ替えられていない大きなリストで最も近い値を見つけるには、分割統治戦略を使用し、リストのさまざまな部分を並行して処理することをお勧めします。しかし、リストの十分な小さな部分は順次処理される可能性があります

これがあなたのためのコードです:

-module( finder ).
-export( [ nearest/2 ] ).

-define( THRESHOLD, 1000 ).

%%
%% sequential finding of nearest value
%%
%% if nearest value doesn't exists - return null
%%
nearest( Val, List ) when length(List) =< ?THRESHOLD ->
        lists:foldl(
                fun
                ( X, null ) when X < Val ->
                        X;
                ( _X, null ) ->
                        null;
                ( X, Nearest ) when X < Val, X > Nearest ->
                        X;
                ( _X, Nearest ) ->
                        Nearest
                end,
                null,
                List );
%%
%% split large lists and process each part in parallel
%%
nearest( Val, List ) ->
        { Left, Right } = lists:split( length(List) div 2, List ),
        Ref1 = spawn_nearest( Val, Left ),
        Ref2 = spawn_nearest( Val, Right ),
        Nearest1 = receive_nearest( Ref1 ),
        Nearest2 = receive_nearest( Ref2 ),
        %%
        %% compare nearest values from each part
        %%
        case { Nearest1, Nearest2 } of
                { null, null } ->
                        null;
                { null, Nearest2 } ->
                        Nearest2;
                { Nearest1, null } ->
                        Nearest1;
                { Nearest1, Nearest2 } when Nearest2 > Nearest1 ->
                        Nearest2;
                { Nearest1, Nearest2 } when Nearest2 =< Nearest1 ->
                        Nearest1
        end.

spawn_nearest( Val, List ) ->
        Ref = make_ref(),
        SelfPid = self(),
        spawn(
                fun() ->
                        SelfPid ! { Ref, nearest( Val, List ) }
                end ),
        Ref.

receive_nearest( Ref ) ->
        receive
                { Ref, Nearest } -> Nearest
        end.

ここに画像の説明を入力

シェルでのテスト:

1> c(finder).
{ok,finder}
2> 
2> List = [ random:uniform(1000) || _X <- lists:seq(1,100000) ].
[444,724,946,502,312,598,916,667,478,597,143,210,698,160,
 559,215,458,422,6,563,476,401,310,59,579,990,331,184,203|...]
3> 
3> finder:nearest( 500, List ).
499
4>
4> finder:nearest( -100, lists:seq(1,100000) ).
null
5> 
5> finder:nearest( 40000, lists:seq(1,100000) ).
39999
6> 
6> finder:nearest( 4000000, lists:seq(1,100000) ).
100000

パフォーマンス: (単一ノード)

7> 
7> timer:tc( finder, nearest, [ 40000, lists:seq(1,10000) ] ). 
{3434,10000}
8> 
8> timer:tc( finder, nearest, [ 40000, lists:seq(1,100000) ] ).
{21736,39999}
9>
9> timer:tc( finder, nearest, [ 40000, lists:seq(1,1000000) ] ).
{314399,39999}

単純な反復と比較:

1> 
1> timer:tc( lists, foldl, [ fun(_X, Acc) -> Acc end, null, lists:seq(1,10000) ] ).
{14994,null}
2> 
2> timer:tc( lists, foldl, [ fun(_X, Acc) -> Acc end, null, lists:seq(1,100000) ] ).
{141951,null}
3>
3> timer:tc( lists, foldl, [ fun(_X, Acc) -> Acc end, null, lists:seq(1,1000000) ] ).
{1374426,null}

したがって、1000000 個の要素を持つリストでは、 functionfinder:nearestは.lists:foldl

あなたの場合、最適な値を見つけることができTHRESHOLDます。

また、異なるノードでプロセスを生成すると、パフォーマンスが向上する可能性があります。

于 2012-09-06T23:23:46.463 に答える
3

ets を使用する別のコード サンプルを次に示します。ルックアップはほぼ一定の時間で行われると思います:

1> ets:new(tab,[named_table, ordered_set, public]).
2> lists:foreach(fun(N) -> ets:insert(tab,{N,[]}) end, lists:seq(1,50000000)).
3> timer:tc(fun() -> ets:prev(tab, 500000) end).
{21,499999}
4> timer:tc(fun() -> ets:prev(tab, 41230000) end).
{26,41229999}

もちろん、周囲のコードはこれよりも少し多くなりますが、かなりきれいです

于 2012-09-10T08:36:14.947 に答える
1

私の意見では、頻繁に変更されない膨大なデータのコレクションがある場合は、それを整理することを検討する必要があります。挿入、削除機能など、順序付きリストに基づいた簡単なものを作成しました。挿入と検索の両方で良い結果が得られます。

-module(finder).

-export([test/1,find/2,insert/2,remove/2,new/0]).

-compile(export_all).

new() -> [].

insert(V,L) -> 
    {R,P} = locate(V,L,undefined,-1),
    insert(V,R,P,L).

find(V,L) -> 
    locate(V,L,undefined,-1).

remove(V,L) ->  
    {R,P} = locate(V,L,undefined,-1),
    remove(V,R,P,L).

test(Max) -> 
    {A,B,C} = erlang:now(),
    random:seed(A,B,C),
    L = lists:seq(0,100*Max,100),
    S = random:uniform(100000000),
    I = random:uniform(100000000),
    io:format("start insert at ~p~n",[erlang:now()]),
    L1 = insert(I,L),
    io:format("start find at ~p~n",[erlang:now()]),
    R = find(S,L1),
    io:format("end at ~p~n result is ~p~n",[erlang:now(),R]).

remove(_,_,-1,L) -> L;
remove(V,V,P,L) ->
    {L1,[V|L2]} = lists:split(P,L),
    L1 ++ L2;
remove(_,_,_,L) ->L.

insert(V,V,_,L) -> L;
insert(V,_,-1,L) -> [V|L];
insert(V,_,P,L) ->
    {L1,L2} = lists:split(P+1,L),
    L1 ++ [V] ++ L2.

locate(_,[],R,P) -> {R,P};
locate (V,L,R,P) -> 
    %% io:format("locate, value = ~p, liste = ~p, current result = ~p, current pos = ~p~n",[V,L,R,P]),
    {L1,[M|L2]} = lists:split(Le1 = (length(L) div 2), L),
    locate(V,R,P,Le1+1,L1,M,L2).

locate(V,_,P,Le,_,V,_) -> {V,P+Le};
locate(V,_,P,Le,_,M,L2) when V > M -> locate(V,L2,M,P+Le);
locate(V,R,P,_,L1,_,_) -> locate(V,L1,R,P).

次の結果が得られます

(exec @ WXFRB1824L)6> finder:test(10000000)。

{1347,28177,618000}で挿入を開始します

{1347,28178,322000}で検索を開始します

{1347,28178,728000}で終了

結果は{72983500,729836}です

つまり、10 000 000要素のリストに新しい値を挿入する場合は704ミリ秒、同じリスト内で最も近い値を見つける場合は406ミリ秒です。

于 2012-09-07T22:32:59.827 に答える
1

したがって、入力がソートされていない場合は、次のようにして線形バージョンを取得できます。

closest(Target, [Hd | Tl ]) ->
        closest(Target, Tl, Hd).

closest(_Target, [], Best) -> Best;
closest(Target, [ Target | _ ], _) -> Target;
closest(Target, [ N | Rest ], Best) ->
    CurEps = erlang:abs(Target - Best),
    NewEps = erlang:abs(Target -  N),
    if NewEps < CurEps ->
            closest(Target, Rest, N);
       true ->
            closest(Target, Rest, Best)
    end.

入力がソートされている場合は、より適切に実行できるはずです。

ここでは、最も近い値がターゲット値よりも高くなるように、「最も近い」ための独自のメトリックを発明しました。必要に応じて、「最も近いが大きくない」に変更できます。

于 2012-09-06T21:37:50.350 に答える
0

上記で提案したアルゴリズムのパフォーマンスに関するより正確な情報を取得しようとしました。Stemm の非常に興味深いソリューションを読んで、tc:timer/3 関数を使用することにしました。大きな欺瞞:o)。私のラップトップでは、時間の精度が非常に悪くなりました。そこで、自宅の PC を使用するために、corei5 (2 コア * 2 スレッド) + 2Gb DDR3 + Windows XP 32 ビットを残すことにしました: Phantom (6 コア) + 8Gb + Linux 64 ビット。

tc:timer が期待どおりに動作するようになり、100 000 000 の整数のリストを操作できるようになりました。各ステップで長さ関数を呼び出すために多くの時間を失っていることがわかったので、それを避けるためにコードを少しリファクタリングしました。

-module(finder).

-export([test/2,find/2,insert/2,remove/2,new/0]).

%% interface

new() -> {0,[]}.

insert(V,{S,L}) -> 
    {R,P} = locate(V,L,S,undefined,-1),
    insert(V,R,P,L,S).

find(V,{S,L}) -> 
    locate(V,L,S,undefined,-1).

remove(V,{S,L}) ->  
    {R,P} = locate(V,L,S,undefined,-1),
    remove(V,R,P,L,S).

remove(_,_,-1,L,S) -> {S,L};
remove(V,V,P,L,S) ->
    {L1,[V|L2]} = lists:split(P,L),
    {S-1,L1 ++ L2};
remove(_,_,_,L,S) ->{S,L}.

%% local

insert(V,V,_,L,S) -> {S,L};
insert(V,_,-1,L,S) -> {S+1,[V|L]};
insert(V,_,P,L,S) ->
    {L1,L2} = lists:split(P+1,L),
    {S+1,L1 ++ [V] ++ L2}.

locate(_,[],_,R,P) -> {R,P};
locate (V,L,S,R,P) -> 
    S1 = S div 2,
    S2 = S - S1 -1,
    {L1,[M|L2]} = lists:split(S1, L),
    locate(V,R,P,S1+1,L1,S1,M,L2,S2).

locate(V,_,P,Le,_,_,V,_,_) -> {V,P+Le};
locate(V,_,P,Le,_,_,M,L2,S2) when V > M -> locate(V,L2,S2,M,P+Le);
locate(V,R,P,_,L1,S1,_,_,_) -> locate(V,L1,S1,R,P).

%% test

test(Max,Iter) -> 
    {A,B,C} = erlang:now(),
    random:seed(A,B,C),
    L = {Max+1,lists:seq(0,100*Max,100)},
    Ins = test_insert(L,Iter,[]),
    io:format("insert:~n~s~n",[stat(Ins,Iter)]),
    Fin = test_find(L,Iter,[]),
    io:format("find:~n ~s~n",[stat(Fin,Iter)]).

test_insert(_L,0,Res) -> Res;
test_insert(L,I,Res) ->
    V = random:uniform(1000000000),
    {T,_} = timer:tc(finder,insert,[V,L]),
    test_insert(L,I-1,[T|Res]).

test_find(_L,0,Res) -> Res;
test_find(L,I,Res) ->
    V = random:uniform(1000000000),
    {T,_} = timer:tc(finder,find,[V,L]),
    test_find(L,I-1,[T|Res]).

stat(L,N) ->
    Aver = lists:sum(L)/N,
    {Min,Max,Var} = lists:foldl(fun (X,{Mi,Ma,Va}) -> {min(X,Mi),max(X,Ma),Va+(X-Aver)*(X-Aver)} end, {999999999999999999999999999,0,0}, L),
    Sig = math:sqrt(Var/N),
    io_lib:format("   average: ~p,~n   minimum: ~p,~n   maximum: ~p,~n   sigma   : ~p.~n",[Aver,Min,Max,Sig]).

ここにいくつかの結果があります。

1> finder:test(1000,10)。入れる:

平均: 266.7,

最小: 216,

最大: 324,

シグマ : 36.98121144581393。

探す:

average: 136.1,

最小: 105,

最大: 162,

シグマ : 15.378231367748375。

わかった

2> finder:test(100000,10).

入れる:

平均: 10096.5,

最小: 9541、

最大: 12222,

シグマ : 762.5642595873478.

探す:

average: 5077.4,

最小: 4666、

最大: 6937,

シグマ : 627.126494417195.

わかった

3> finder:test(1000000,10).

入れる:

平均: 109871.1,

最小: 94747、

最大: 139916,

シグマ : 13852.211285206417.

検索: 平均: 40428.0,

最小: 31297、

最大: 56965,

シグマ : 7797.425562325042。

わかった

4> ファインダー: テスト (100000000,10).

入れる:

平均: 8067547.8,

最小: 6265625、

最大: 16590349,

シグマ : 3199868.809140206。

探す:

average: 8484876.4,

最小: 5158504、

最大: 15950944、

シグマ : 4044848.707872872.

わかった

100 000 000 リストでは遅く、マルチプロセス ソリューションはこの二分法アルゴリズムでは役に立ちません。とにかくマルチコアを使用することができます。

パスカル。

于 2012-09-10T04:45:42.917 に答える