4

MapReduce クエリを高速化するにはどうすればよいでしょうか?

5 ノードの Riak DB クラスターを使用してアプリケーションを構築しました。私たちのデータ モデルは、試合、リーグ、チームの 3 つのバケットで構成されています。

Matches には、リーグとチームへのリンクが含まれています。

モデル

var match = {
        id: matchId,
        leagueId: meta.leagueId,
        homeTeamId: meta.homeTeamId,
        awayTeamId: meta.awayTeamId,
        startTime: m.match.startTime,
        firstHalfStartTime: m.match.firstHalfStartTime,
        secondHalfStartTime: m.match.secondHalfStartTime,
        score: {
            goals: {
                a: 1*safeGet(m.match, 'score.goals.a'),
                b: 1*safeGet(m.match, 'score.goals.b')
            },
            corners: {
                a: 1*safeGet(m.match, 'score.corners.a'),
                b: 1*safeGet(m.match, 'score.corners.b')
            }
        }
    };

var options = {
        index: {
            leagueId: match.leagueId,
            teamId: [match.homeTeamId, match.awayTeamId],
            startTime: match.startTime || match.firstHalfStartTime || match.secondHalfStartTime
        },
        links: [
            { bucket: 'leagues', key: match.leagueId, tag: 'league' },
            { bucket: 'teams', key: match.homeTeamId, tag: 'home' },
            { bucket: 'teams', key: match.awayTeamId, tag: 'away' }
        ]
    };
    match.model = 'match';
    modelCache.save('matches', match.id, match, options, callback);

クエリ

複数のバケットから結果を返すクエリを作成します。1 つの方法は、各バケットを個別にクエリすることです。もう 1 つの方法は、リンクを使用して 1 つのクエリの結果を結合することです。

試した 2 つのバージョンのクエリは、バケット サイズがどれほど小さくても、どちらも 1 秒以上かかりました。最初のバージョンでは 2 つのマップ フェーズを使用しますが、これはこの記事 (実用的なマップ削減: 転送と収集) をモデルにしています。

#!/bin/bash
curl -X POST \
-H "content-type: application/json" \
-d @- \
http://localhost:8091/mapred \
<<EOF
{
    "inputs":{
        "bucket":"matches",
        "index":"startTime_bin",
        "start":"2012-10-22T23:00:00",
        "end":"2012-10-24T23:35:00"
    },
    "query": [
        {"map":{"language": "javascript", "source":"
                function(value, keydata, arg){
                    var match = Riak.mapValuesJson(value)[0];
                    var links = value.values[0].metadata.Links;
                    var result = links.map(function(l) {
                        return [l[0], l[1], match];
                    });
                    return result;
                }
            "}
        },
        {"map":{"language": "javascript", "source": "
                function(value, keydata, arg) {
                    var doc = Riak.mapValuesJson(value)[0];
                    return [doc, keydata];
                }
            "}
        },
        {"reduce":{
            "language": "javascript",
                "source":"
                    function(values) {
                        var merged = {};
                        values.forEach(function(v) {
                            if(!merged[v.id]) {
                                merged[v.id] = v;
                            }
                        });
                        var results = [];
                        for(key in merged) {
                            results.push(merged[key]);
                        }
                        return results;
                    }
                "
            }
        }
    ]
}
EOF

2 番目のバージョンでは、4 つの個別の Map-Reduce クエリを実行して、3 つのバケットからオブジェクトを取得します。

async.series([
        //First get all matches
        function(callback) {
            db.mapreduce
                .add(inputs)
                .map(function (val, key, arg) {
                    var data = Riak.mapValuesJson(val)[0];
                    if(arg.leagueId && arg.leagueId != data.leagueId) {
                        return [];
                    }
                    var d = new Date();
                    var date = data.startTime || data.firstHalfStartTime || data.secondHalfStartTime;
                    d.setFullYear(date.substring(0, 4));
                    d.setMonth(date.substring(5, 7) - 1);
                    d.setDate(date.substring(8, 10));
                    d.setHours(date.substring(11, 13));
                    d.setMinutes(date.substring(14, 16));
                    d.setSeconds(date.substring(17, 19));
                    d.setMilliseconds(0);
                    startTimestamp = d.getTime();
                    var short = {
                        id: data.id,
                        l: data.leagueId,
                        h: data.homeTeamId,
                        a: data.awayTeamId,
                        t: startTimestamp,
                        s: data.score,
                        c: startTimestamp
                    };
                    return [short];
                }, {leagueId: query.leagueId, page: query.page}).reduce(function (val, key) {
                    return val;
                }).run(function (err, matches) {
                    matches.forEach(function(match) {
                        result.match[match.id] = match; //Should maybe filter this
                        leagueIds.push(match.l);
                        teamIds.push(match.h);
                        teamIds.push(match.a);
                    });
                    callback();
                });
        },
        //Then get all leagues, teams and lines in parallel
        function(callback) {
            async.parallel([
                //Leagues
                function(callback) {
                    db.getMany('leagues', leagueIds, function(err, leagues) {
                        if (err) { callback(err); return; }
                        leagues.forEach(function(league) {
                            visibleLeagueIds[league.id] = true;
                            result.league[league.id] = {
                                r: league.regionId,
                                n: league.name,
                                s: league.name
                            };
                        });
                        callback();
                    });
                },
                //Teams
                function(callback) {
                    db.getMany('teams', teamIds, function(err, teams) {
                        if (err) { callback(err); return; }
                        teams.forEach(function(team) {
                            result.team[team.id] = {
                                n: team.name,
                                h: team.name,
                                s: team.stats
                            };
                        });
                        callback();
                    });
                }
            ], callback);
        }
    ], function(err) {
        if (err) { callback(err); return; }
        _.each(regionModel.getAll(), function(region) {
           result.region[region.id] = {
               id: region.id,
               c: 'https://d1goqbu19rcwi8.cloudfront.net/icons/silk-flags/' + region.icon + '.png',
               n: region.name
           };
        });
        var response = {
            success: true,
            result: {
                modelRecords: result,
                paging: {
                    page: query.page,
                    pageSize: 50,
                    total: result.match.length
                },
                time: moment().diff(a)/1000.00,
                visibleLeagueIds: visibleLeagueIds
            }
        };
        callback(null, JSON.stringify(response, null, '\t'));
    });

これらのクエリを高速化するにはどうすればよいでしょうか?

追加情報:

クエリを実行するために riak-js と node.js を使用しています。

4

1 に答える 1

7

少なくとも少し高速化する 1 つの方法は、JavaScript の mapreduce 関数をジョブの一部として渡すのではなく、サーバーにデプロイすることです。( js_source_dirパラメータの説明はこちらを参照してください)。これは通常、繰り返し実行する JavaScript 関数がある場合に推奨されます。

JavaScript の mapreduce 関数を実行すると、Erlang で実装されたネイティブ関数と比較してオーバーヘッドが発生するため、可能であれば非 JavaScript 関数を使用することも役立つ場合があります。

最初のクエリの 2 つのマップ フェーズ関数は、通常のリンク フェーズ (より効率的だと思います) が処理中のレコード ( matchesレコード)を渡さないという制限を回避するように設計されているようです。最初の関数はすべてのリンクを含み、JSON 形式の追加データとして試合データを渡します。2 番目の関数は、JSON 形式で試合のデータとリンクされたレコードを渡します。

すべてのリンクと渡されたレコードの ID を含む単純な Erlang 関数を作成しました。これをネイティブ Erlang 関数riak_kv_mapreduce:map_object_valueと一緒に使用して、最初の例の 2 つのマップ フェーズ関数を置き換え、一部を削除することができます。 JavaScript の使用。既存のソリューションと同様に、複数の試合が同じリーグ/チームにリンクしている可能性があるため、多数の重複を受け取ることを期待しています.

-module(riak_mapreduce_example).

-export([map_link/3]).

%% @spec map_link(riak_object:riak_object(), term(), term()) ->
%%                   [{{Bucket :: binary(), Key :: binary()}, Props :: term()}]
%% @doc map phase function for adding linked records to result set
map_link({error, notfound}, _, _) ->
    [];
map_link(RiakObject, Props, _) ->
    Bucket = riak_object:bucket(RiakObject),
    Key = riak_object:key(RiakObject),
    Meta = riak_object:get_metadata(RiakObject),
    Current = [{{Bucket, Key}, Props}],
    Links = case dict:find(<<"Links">>, Meta) of
        {ok, List} ->
            [{{B, K}, Props} || {{B, K}, _Tag} <- List];
        error ->
            []
    end,
    lists:append([Current, Links]).

これらの結果は、集計のためにクライアントに送り返すか、提供した例のように削減フェーズ関数に渡すことができます。

サンプル関数はコンパイルしてすべてのノードにインストールする必要があり、再起動が必要になる場合があります。

パフォーマンスを向上させるもう 1 つの方法 (それは選択肢ではないかもしれません) は、データ モデルを変更して、パフォーマンスが重要なクエリに mapreduce クエリを使用する必要がないようにすることです。

于 2012-10-25T19:13:46.000 に答える