0

私はいくつかのcsvファイルに200万のデータを持っています...私はそれらのファイルを取得し、それを減らして、このような結果の合計を取得しています.,..

{ "_id" : "08-08-2012 05:00", "value" : { "CollectionDate" : "08-08-2012 05:00", "count" : 20000 } }
{ "_id" : "08-08-2012 05:15", "value" : { "CollectionDate" : "08-08-2012 05:15", "count" : 20000 } }
{ "_id" : "08-08-2012 05:30", "value" : { "CollectionDate" : "08-08-2012 05:30", "count" : 20000 } }
{ "_id" : "08-08-2012 05:45", "value" : { "CollectionDate" : "08-08-2012 05:45", "count" : 20000 } }
{ "_id" : "08-08-2012 06:00", "value" : { "CollectionDate" : "08-08-2012 06:00", "count" : 20001 } }
{ "_id" : "08-08-2012 06:15", "value" : { "CollectionDate" : "08-08-2012 06:15", "count" : 20000 } }
{ "_id" : "08-08-2012 06:30", "value" : { "CollectionDate" : "08-08-2012 06:30", "count" : 20000 } }
....
{ "_id" : "08-08-2012 10:30", "value" : { "CollectionDate" : "08-08-2012 06:30", "count" : 20000 } }
{ "_id" : "08-08-2012 10:45", "value" : { "CollectionDate" : "08-08-2012 06:30", "count" : 20000 } }
{ "_id" : "08-08-2012 11:00", "value" : { "CollectionDate" : "08-08-2012 06:30", "count" : 20000 } }

この結果を取得するためのソース コードは次のとおりです。

class DirectImport
{
    private static bool forOnce = true;
    static void Main(string[] args)
    {
        Stopwatch stopwatch = new Stopwatch();
        bool reset = true;
        BsonString lastAggregatedDate = "";
        String lastCollName = "";
        MongoServer mongo = MongoServer.Create();
        mongo.Connect();
        Console.WriteLine("Connected"); Console.WriteLine();
        var db = mongo.GetDatabase("SampleData");
        using (mongo.RequestStart(db))
        {
            List<BsonDocument> mDoc = new List<BsonDocument>();
            IEnumerable<String> CollectionList = Directory.EnumerateFiles(@"c:\mongoDb\Final");
            DateTime oldDt = new DateTime(1, 1, 1);
            foreach (String coll in CollectionList.ToArray<String>())
            {
                String collName = coll.Substring(coll.LastIndexOf(@"\") + 1);
                collName = collName.Substring(0, collName.LastIndexOf('.'));
                //Console.WriteLine(collName);
                string[] records = File.ReadAllLines(@"C:\mongoDB\Final\" + collName + ".csv");
                mDoc = new List<BsonDocument>();

                var newCollection = db.GetCollection<BsonDocument>(collName);
                for (int l = 1; l < records.Length; l++)
                {
                    string[] abc = records[l].Split(',');
                    //Console.WriteLine(abc[1] + "_" + abc[2] + "_" + abc[5]);
                    BsonDocument book1 = new BsonDocument();
                    book1.Add("_id", BsonString.Create(abc[1] + "_" + abc[2] + "_" + abc[5]));
                    book1.Add("Base_Name", BsonString.Create(abc[0]));
                    book1.Add("ObjectType", BsonString.Create(abc[1]));
                    book1.Add("C_ID", BsonString.Create(abc[2]));
                    book1.Add("AssociateDimension1", BsonString.Create(abc[3]));
                    book1.Add("AssociateDimension2", BsonInt32.Create(abc[4]));
                    book1.Add("CollectionDateHour", BsonInt32.Create(Convert.ToDateTime(BsonString.Create(abc[5])).Hour));
                    book1.Add("CollectionDate", BsonString.Create(abc[5]));
                    book1.Add("Granularity", BsonInt32.Create(abc[6]));
                    book1.Add("Counter1_1", BsonDouble.Create(abc[7]));
                    book1.Add("Counter1_2", BsonDouble.Create(abc[8]));
                    book1.Add("Counter1_3", BsonDouble.Create(abc[9]));
                    book1.Add("Counter1_4", BsonDouble.Create(abc[10]));
                    book1.Add("Counter1_5", BsonDouble.Create(abc[11]));
                    book1.Add("Counter1_6", BsonDouble.Create(abc[12]));
                    book1.Add("Counter1_7", BsonDouble.Create(abc[13]));
                    if (forOnce)
                    {
                        forOnce = false;
                        oldDt = Convert.ToDateTime(BsonString.Create(abc[5]));
                        lastCollName = collName;
                    }
                    DateTime dt = Convert.ToDateTime(BsonString.Create(abc[5]));
                    mDoc.Add(book1);
                    newCollection.Insert(book1);
                    String[] strparam = { "CollectionDate" };
                    if (!newCollection.IndexExists(strparam))
                        newCollection.CreateIndex(strparam);
                    if (oldDt > dt)
                        Console.WriteLine(dt + ":::" + oldDt);
                    if (((dt - oldDt).Hours >= 1) || oldDt > dt)
                    {
                        stopwatch.Start();
                        Console.WriteLine("Calling MAPReduce at: " + oldDt);
                        Console.WriteLine();
                        //var coll1 = db.GetCollection<BsonDocument>("MSS1");
                        //coll1.InsertBatch<BsonDocument>(mDoc);
                        MapReduce(db, lastCollName, dt, lastAggregatedDate, reset);
                        lastCollName = collName;
                        //coll1.Drop();
                        lastAggregatedDate = BsonString.Create(abc[5]);
                        reset = false;
                        mDoc = new List<BsonDocument>();
                        oldDt = dt;
                        TimeSpan ts = stopwatch.Elapsed;
                        Console.WriteLine("RunTime: " + ts.Hours + ":" + ts.Minutes + ":" + ts.Seconds + ":" + ts.Milliseconds / 10);
                        stopwatch.Stop();
                    }
                }
            }
        }
        Console.Read();
        mongo.Disconnect();
    }
private static void MapReduce(MongoDatabase db, String collName, BsonValue bsonValue, BsonString lastAggregateDate, bool reset)
    {
        var collection = db.GetCollection<BsonDocument>(collName);
        Console.WriteLine(collName);
        String map = @"function() { 
                                    emit(this.CollectionDate, {CollectionDate : this.CollectionDate, count: 1});
                                  }";
        String reduce = @"function(key, values) {
                                        var result = {CollectionDate:key, count: 0};
                                        values.forEach(function(value){
                                            result.count += value.count;
                                        });
                                        return result;
                                    }";
        var options = new MapReduceOptionsBuilder();
        IMongoQuery[] queries = { Query.GTE("CollectionDate", lastAggregateDate) };
        //if(reset)
        //    queries = new IMongoQuery[] { Query.LT("CollectionDate", bsonValue) };

        options.SetOutput(MapReduceOutput.Reduce("MSS_REDUCE"));
        //options.SetOutput(MapReduceOutput.Reduce);
        IMongoQuery query = Query.And(queries);
        var results = collection.MapReduce(queries[0], map, reduce, options);
}

問題は、必要な出力が次のとおりであることです。

{ "_id" : "08-08-2012 05:00", "value" : { "CollectionDate" : "08-08-2012 05:00", "count" : 20000 } }
{ "_id" : "08-08-2012 06:00", "value" : { "collectionDate" : "08-08-2012 06:00", "count" : 80000 } }
{ "_id" : "08-08-2012 07:00", "value" : { "collectionDate" : "08-08-2012 07:00", "count" : 80000 } }
{ "_id" : "08-08-2012 08:00", "value" : { "CollectionDate" : "08-08-2012 07:45", "count" : 80000 } }
{ "_id" : "08-08-2012 09:00", "value" : { "collectionDate" : "08-08-2012 09:00", "count" : 80000 } }
{ "_id" : "08-08-2012 10:00", "value" : { "collectionDate" : "08-08-2012 10:00", "count" : 80000 } }
{ "_id" : "08-08-2012 11:00", "value" : { "CollectionDate" : "08-08-2012 10:45", "count" : 80000 } }

今、_idキーをシャーディングすることでできると思います..しかし、C#でこれを行う方法と、この結果を達成する方法..助けてください

4

1 に答える 1

2

この質問は、私が間違っていない限り、マップの縮小を変更することで解決できます。

コメントからの私の仮定は、あなたが探しているのは、前の各時間の各時間の合計であるということです。CollectionDateHourこれは、ではなく でmapreduce できることを意味しますCollectionDate。その後、アプリケーションで をどこに配置するかを制御できますCollectionDateHour(たとえば、 がグループ内にあるかグループ6:45内にあるかを決定できます。7:006:00

アプリケーションで変更する点が 2 つあります。まず、収集日時を処理します。

for (int l = 1; l < records.Length; l++)
{
...
    var collectionDate = Convert.ToDateTime(BsonString.Create(abc[5]));
    var collectionDateHour = collectionDate.Minute>0?collectionDate.AddHours(1).Hour:collectionDate.Hour;
    book1.Add("CollectionDateHour", BsonInt32.Create(collectionDateHour));
...

. _ CollectionDateHour_ CollectionDate_ CollectionDateロジックに合わせてこれを調整できます。

CollectionDateHour2 番目の変更は、以前に使用した完全な文字列ではなく、に基づいて削減するように、map reduce 関数を調整することです。これを行う 1 つの方法は、マップ関数を変更することです。

String map = @"function() { 
    var newCollectionDate = this.CollectionDate.split(' ')[0] + (this.CollectionDateHour<10?' 0':' ')+this.CollectionDateHour + ':00'
    emit(newCollectionDate, {CollectionDate : newCollectionDate, count: 1});
}";

これにより、収集日がアプリケーションロジックで計算された時間の数字に設定され、それが短縮されます。上記の 2 つの変更を考えると、次の入力:

{ "CollectionDate" : "08-08-2012 05:00" }
{ "CollectionDate" : "08-08-2012 05:15" }
{ "CollectionDate" : "08-08-2012 05:10" }
{ "CollectionDate" : "08-08-2012 04:00" }
{ "CollectionDate" : "08-08-2012 12:45" }
{ "CollectionDate" : "08-08-2012 12:46" }
{ "CollectionDate" : "08-08-2012 23:45" }

これらの結果が得られます:

{ "_id" : "08-08-2012 00:00", "value" : { "CollectionDate" : "08-08-2012 00:00", "count" : 1 } }
{ "_id" : "08-08-2012 04:00", "value" : { "CollectionDate" : "08-08-2012 04:00", "count" : 1 } }
{ "_id" : "08-08-2012 05:00", "value" : { "CollectionDate" : "08-08-2012 05:00", "count" : 1 } }
{ "_id" : "08-08-2012 06:00", "value" : { "CollectionDate" : "08-08-2012 06:00", "count" : 2 } }
{ "_id" : "08-08-2012 13:00", "value" : { "CollectionDate" : "08-08-2012 13:00", "count" : 2 } }
于 2012-09-20T01:24:04.547 に答える