mapReduce ジョブの入力は MongoDB コレクションであり、データ ファイルにすることはできません。
しかし、あなたの場合、「縮小」部分がないため、mapReduce() は必要ありません。入力レコードを 1:1 で出力レコードに変換するだけです。
したがって、最初のステップは、データ ファイルをコレクション「inp」に保存することです。つまり、時系列を配列としてドキュメントに保存します。データ ファイルが 16 MB を超えるドキュメントになる場合は、それをいくつかのドキュメントに分割する必要があります。この例では、ドキュメントごとに 2 つのタイムスタンプ要素のみを保存します。私はモンゴシェルのJavaScriptで例を行いました:
PATH = "/home/ronald/mongotest/";
DATA = "data.file";
ELEMS_PER_DOC = 2; // number of emelements in "series" per document
db.data.drop();
data = cat( PATH + DATA );
lines = data.split("\n")
lines = lines.splice(0,lines.length-1);
series = [];
lines.forEach(function( line ) {
if ( series.length >= ELEMS_PER_DOC ) {
db.data.insert({ "series": series });
series = [];
}
l = line.split("|");
timestamp = l[0];
d = l[1].split(",");
series.push( { "ts": timestamp, "data": d } );
});
db.data.insert({ "series": series });
指定されたデータ ファイルの場合:
6|46,36,A
7|90,45,B
8|45,12,C
9|34,67,D
これにより、次のコレクションが生成されます。
> db.data.find().pretty()
{
"_id" : ObjectId("515e735657a0887a97cc8d23"),
"series" : [
{
"ts" : "6",
"data" : [
"46",
"36",
"A"
]
},
{
"ts" : "7",
"data" : [
"90",
"45",
"B"
]
}
]
}
{
"_id" : ObjectId("515e735657a0887a97cc8d24"),
"series" : [
{
"ts" : "8",
"data" : [
"45",
"12",
"C"
]
},
{
"ts" : "9",
"data" : [
"34",
"67",
"D"
]
}
]
}
[ 注: 入力データを MongoDB に保存したくない場合は、「シリーズ」配列を作成し、これを 3 番目のステップで入力として使用します。クライアント マシンのメモリ使用量に注意してください。]
次のステップは、構成ファイルから、ルール セットに従ってデータを変換するために使用される JavaScript 関数を生成することです。実際には、ハードコーディングされた 3 次元への制限を避けるために、これは関数の配列になります。
PATH = "/home/ronald/mongotest/";
CONFIG = "config.file";
config = cat( PATH + CONFIG );
lines = config.split("\n")
lines = lines.splice(0,lines.length-1);
// array of functions - index = dimension
funcs = [];
lines.forEach(function( line ) {
x = line.split(",");
f = "";
if ( x[1] == "N" ) {
// Numeric rule:
// x[2] = granularity
// x[3],x[4] lower,upper range
// the function to be called for the given value looks like:
// function( val ) returns: the interval or "n/a" if outside range
// the interval is given by (val - (val modulo intervalSize)) / intervalSize
// the intervalSize is (max - min) / granularity
intervalSize = (x[4] - x[3]) / x[2];
f = "function (val) {";
f += " if ( val < "+x[3]+" || val > "+x[4]+" ) return 'n/a';";
f += " return (val - val % "+intervalSize+") / "+intervalSize+";";
f += "}";
} else if ( x[1] == "C" ) {
// Categoric rule:
// return the position of value in the array of params
// skip dimension and rule type
x = x.splice(2, x.length-1);
// build parameter array
pa = '[';
x.forEach( function(p) { pa += '"' + p + '",' } );
pa += ']';
// the function will return -1 if value not found in array
f = "function (val) { return "+pa+".indexOf(val) }";
}
else {
// unknown rule type
f = "function (val) { return 'rule err' }";
}
eval( "fx = "+f );
funcs.push( fx );
});
指定された構成ファイルの場合:
0,N,4,0,100
1,N,2,0,50
2,C,A,B,C,D
これにより、次の関数配列が生成されます。
> funcs
[
function (val) { if ( val < 0 || val > 100 ) return 'n/a'; return (val - val % 25) / 25;},
function (val) { if ( val < 0 || val > 50 ) return 'n/a'; return (val - val % 25) / 25;},
function (val) { return ["A","B","C","D",].indexOf(val) }
]
次に、3 番目で最後の部分: 入力コレクションから出力コレクションを作成します。
db.out.drop();
cursor = db.data.find();
cursor.forEach( function (doc) {
doc.series.forEach( function (serie) {
for ( i=0; i<serie.data.length; i++ ) {
// apply transformation function for each dimension
serie.data[i] = funcs[i]( serie.data[i] );
}
});
db.out.insert( doc );
})
最終結果は次のとおりです。
> db.out.find().pretty()
{
"_id" : ObjectId("515e974c57a0887a97cc8d2f"),
"series" : [
{
"ts" : "6",
"data" : [
1,
1,
0
]
},
{
"ts" : "7",
"data" : [
3,
1,
1
]
}
]
}
{
"_id" : ObjectId("515e974c57a0887a97cc8d30"),
"series" : [
{
"ts" : "8",
"data" : [
1,
0,
2
]
},
{
"ts" : "9",
"data" : [
1,
"n/a",
3
]
}
]
}