0

新しい mongo バルク API を使用してカスタム バルク アップロード スクリプトを作成しようとしています。最初は非常に高速に動作するものを使用しUnorderedBulkOpていますが、数回呼び出された後、ハングし始めます。ログ行を使用してみましたが、10回目の呼び出しの後、実際に爆発し始めたようです。アップロードを停止して再起動すると (重複をチェックするコードが配置されています)、最初のいくつかの呼び出しexecuteは再びパフォーマンスが向上するため、コレクション内のデータ量に依存しているようには見えません。何が起こっている?すべての操作を一括操作にプッシュし、一度だけ実行を呼び出すことを考えましたが、ここで別の回答を見て、一括操作を段階的に呼び出しexecuteました。

それはこれをやっているビットを取り除いた:

this.db.collection(collection_name, function(err, collection){
  var bulk = collection.initializeUnorderedBulkOp();
  var operations = 0;
  var dataread = fs.createReadStream(filepath, {encoding: 'utf8'});
  var current = '';

  // load and split data from CSV     
  dataread.on('data', function(data){
    dataread.pause();
    chunk = current + data;
    var split = chunk.split('\n');
    current = split.pop();
    var ids = [];

    for(i=0, len = split.length; i< len; i++){
      lineData = split[i].split(',');
      customid = parseInt(lineData[0]);
      ids.push(customid);
    }

    // find which docs already exist and need to be updated     
    collection.find({customid: {$in: ids}}).toArray(function(err, docs){
      var docmap = {};
      for(i=0, len=docs.length; i<len; i++){
        docmap[docs[i].customid] = docs[i]; 
      }

      for(isplit=0; isplit<split.length; isplit++){
        lineData = split[isplit].split(',');
        customid = parseInt(lineData[0]);

        // check for insert or update
        if(docmap[customid]){
          doc = docmap[customid];
          //update doc
          bulk.find({_id: doc._id}).update({$push: {history: 1}});    
        else{
          doc = formatData(lineData);
          bulk.insert(doc);
        }
        operations++;         
      }

      if(operations > 10000){
        bulk.execute({w: 1}, function(err, result){
          operations = 0;
          dataread.resume();
        });
      }else{
        dataread.resume();
      }    
    });
  });
});

もともと私は個々の呼び出しを使用してこれを行っていましcollection.saveたが、私のデータセットは現在 200 万個のデータポイントのオーダーであり、このアップロードを週に 1 回実行するため、最適化を検討しています。

4

1 に答える 1

0

したがって、これは一括操作の欠点のようです。連続した一括操作を処理しますが、単一の BulkOp オブジェクトは単一のバッチ プロセスのみを処理するのに最適です。bulk成功した呼び出しのコールバックで再初期化することで、これを解決することができましたexecute

bulk.execute({w: 1}, function(err, result){
   operations = 0;
   bulk = collection.initializeUnorderedBulkOp();
   dataread.resume();
});
于 2014-06-07T20:01:42.857 に答える