1

node.js には、再フォーマットしてデータベースに書き込みたい読み取りストリームがあります。読み取りストリームが高速で書き込みが遅いため、書き込みのキューが蓄積されると、node.js キューが圧倒される可能性があります (ストリームが GB のデータであると仮定します)。ブロックせずにこれが起こらないように、コードの書き込み部分を強制的に待機させるにはどうすればよいですか?

var request = http.get({
      host: 'api.geonames.org',
      port: 80,
      path: '/children?' + qs.stringify({
      geonameId: geonameId,
      username: "demo"
   })
}).on('response', function(response) {
   response.setEncoding('utf8');
   var xml = new XmlStream(response, 'utf8');

   xml.on('endElement: geoname ', function(input) {  
      console.log('geoname');
      var output = new Object();
      output.Name = input.name;
      output.lat = input.lat;
      output.lng = input.lng;
      output._key = input.geonameId;
      data.db.document.create(output, data.doc, function(callback){    
         //this is really slow.
      }
      // i do not want to return from here and receive more data until the 'create' above has completed
   });  
});
4

2 に答える 2

3

昨夜、この問題に遭遇したばかりで、ハッカソンによって引き起こされた睡眠不足の状態で、次のように解決しました。

処理するジョブを送信するたびにカウンターをインクリメントし、操作が完了するとカウンターをデクリメントします。アウトバウンド トラフィックが他のサービスを圧倒しないようにするために、保留中のアウトバウンド リクエストが一定数あるときにストリームを一時停止します。コードは次のようになります。

var instream = fs.createReadStream('./combined.csv');
var outstream = new stream;
var inProcess = 0;
var paused = false;
var rl = readline.createInterface(instream, outstream);
rl.on('line', function(line) {
    inProcess++;
    if(inProcess > 100) {
        console.log('pausing input to clear queue');
        rl.pause();
        paused = true;
    }

    someService.doSomethingSlow(line, function() {
        inProcess--;
        if(paused && inProcess < 10) {
            console.log('resuming stream');
            paused = false;
            rl.resume();
        }

        if (err) throw err;
    });
});

rl.on('end', function() {
    rl.close();
});

最も洗練されたソリューションではありませんが、機能し、メモリ不足や他のサービスのスロットリングなしで、100 万行以上を処理できました。

于 2013-09-08T20:45:10.100 に答える
0

私のソリューションは単に空stream.Writableを拡張し、基本的に@Timothyのものと同じですが、イベントを使用し、Streams1.pause()に依存していません.resume()(とにかく、データパイプラインに影響を与えていないようです)。

var stream = require("stream");

var liveRequests = 0;
var maxLiveRequests = 100;
var streamPaused = false;

var requestClient = new stream.Writable();

function requestCompleted(){
    liveRequests--;
    if(streamPaused && liveRequests < maxLiveRequests){
        streamPaused = false;
        requestClient.emit("resumeStream");
    }
}

requestClient._write = function (data, enc, next){
    makeRequest(data, requestCompleted);
    liveRequests++;

    if(liveRequests >= maxLiveRequests){
        streamPaused = true;
        requestClient.once("resumeStream", function resume(){
            next();
        });
    }
    else {
        next();
    }
};

カウンタliveRequestsは同時要求の数を追跡し、 makeRequest()が呼び出されるたびに増分され、完了すると (つまり、 が呼び出されるとrequestCompleted()) 減分されます。リクエストが作成されたばかりで をliveRequests超えmaxLiveRequestsた場合、 でストリームを一時停止しますstreamPaused。リクエストが完了すると、ストリームは一時停止され、liveRequests現在は 未満でありmaxLiveRequests、ストリームを再開できます。後続のデータ項目は_write()そのコールバックが呼び出されたときに読み取られるため、一時停止/再開を模倣するnext()カスタム イベントのイベント リスナーを使用して後者を単純に延期できます。"resumeStream"さて、単にreadStream.pipe(requestClient).


編集:入力データの自動バッチ処理とともに、このソリューションをパッケージに抽象化しました。

于 2014-11-11T18:47:43.010 に答える