サーバーから 10 分の時間間隔でいくつかのレコードを取得しました (1 時間で 6 つのファイルを取得します) 次の数時間で 1 時間ごとに map reduce を実行したい 最後の 6 つのファイルで次のグループの map reduce を実行する必要があります時間ファイルどうすればこの問題を解決できますか? 去年の 1 か月から混乱させてください。Sushil Kr Singh に感謝します。
1 に答える
10分間のログファイルを時間ごとに要約するために、各ログファイルのタイムスタンプをmap関数で最も近い時間に切り捨て、reduce関数で結果を時間ごとにグループ化できます。
これは、mongoシェルからこれを説明する小さなダミーの例です。
10分間隔で0〜10の乱数を含む100個のログファイルを作成
logs
し、データベースのコレクションに挿入します。for (var i = 0; i < 100; i++) { d = new ISODate(); d.setMinutes(d.getMinutes() + i*10); r = Math.floor(Math.random()*11) db.logs.insert({timestamp: d, number: r}) }
logs
コレクションがどのように見えるかを確認するには、のようなクエリを送信しますdb.logs.find().limit(3).pretty()
。その結果、次のようになります。{ "_id" : ObjectId("50455a3570537f9433c1efb2"), "timestamp" : ISODate("2012-09-04T01:32:37.370Z"), "number" : 2 } { "_id" : ObjectId("50455a3570537f9433c1efb3"), "timestamp" : ISODate("2012-09-04T01:42:37.370Z"), "number" : 3 } { "_id" : ObjectId("50455a3570537f9433c1efb4"), "timestamp" : ISODate("2012-09-04T01:52:37.370Z"), "number" : 8 }
mapf
タイムスタンプを最も近い時間(切り捨て)に丸めるマップ関数(この例では)を定義します。これは、emitキーに使用されます。放出値は、そのログファイルの番号です。mapf = function () { // round down to nearest hour d = this.timestamp; d.setMinutes(0); d.setSeconds(0); d.setMilliseconds(0); emit(d, this.number); }
放出されたすべての値(つまり数値)を合計するreduce関数を定義します。
reducef = function (key, values) { var sum = 0; for (var v in values) { sum += values[v]; } return sum; }
次に、ログコレクションでmap/reduceを実行します。ここでの
out
パラメーターは、結果をhourly_logs
コレクションに書き込み、既存のドキュメントを新しい結果とマージすることを指定します。これにより、後で送信されたログファイル(サーバーの障害やその他の遅延など)がログに表示された後、結果に含まれるようになります。db.logs.mapReduce(mapf, reducef, {out: { merge : "hourly_logs" }})
最後に、結果を確認するには、次の簡単な検索をクエリできます
hourly_logs
。db.hourly_logs.find() { "_id" : ISODate("2012-09-04T02:00:00Z"), "value" : 33 } { "_id" : ISODate("2012-09-04T03:00:00Z"), "value" : 31 } { "_id" : ISODate("2012-09-04T04:00:00Z"), "value" : 21 } { "_id" : ISODate("2012-09-04T05:00:00Z"), "value" : 40 } { "_id" : ISODate("2012-09-04T06:00:00Z"), "value" : 26 } { "_id" : ISODate("2012-09-04T07:00:00Z"), "value" : 26 } { "_id" : ISODate("2012-09-04T08:00:00Z"), "value" : 25 } { "_id" : ISODate("2012-09-04T09:00:00Z"), "value" : 46 } { "_id" : ISODate("2012-09-04T10:00:00Z"), "value" : 27 } { "_id" : ISODate("2012-09-04T11:00:00Z"), "value" : 42 } { "_id" : ISODate("2012-09-04T12:00:00Z"), "value" : 43 } { "_id" : ISODate("2012-09-04T13:00:00Z"), "value" : 35 } { "_id" : ISODate("2012-09-04T14:00:00Z"), "value" : 22 } { "_id" : ISODate("2012-09-04T15:00:00Z"), "value" : 34 } { "_id" : ISODate("2012-09-04T16:00:00Z"), "value" : 18 } { "_id" : ISODate("2012-09-04T01:00:00Z"), "value" : 13 } { "_id" : ISODate("2012-09-04T17:00:00Z"), "value" : 25 } { "_id" : ISODate("2012-09-04T18:00:00Z"), "value" : 7 }
結果は、10分間のログの時間ごとの要約であり、_idフィールドには時間の開始が含まれ、valueフィールドには乱数の合計が含まれます。あなたの場合、異なる集計演算子を使用している可能性があります。必要に応じてreduce関数を変更します。
Sammayeがコメントで述べたように、1時間ごとに実行するcronジョブエントリを使用してmap/reduce呼び出しを自動化できます。
毎回完全なログコレクションを処理したくない場合は、次のようにドキュメントを1時間ごとの時間枠に制限することで、増分更新を実行できます。
var q = { $and: [ {timestamp: {$gte: new Date(2012, 8, 4, 12, 0, 0) }},
{timestamp: {$lt: new Date(2012, 8, 4, 13, 0, 0) }} ] }
db.logs.mapReduce(mapf, reducef, {query: q, out: { merge : "hourly_logs" }})
これには、12時から13時までのログファイルのみが含まれます。Date()オブジェクトの月の値は0(8 = 9月)から始まることに注意してください。オプションがあるため、merge
すでに処理されたログファイルでm/rを実行しても安全です。