4

ストリームでファイバーを使用しようとしています:

var Fiber = require('fibers');
var Future = require('fibers/future');
var fs = require('fs');

function sleepForMs(ms) {
  var fiber = Fiber.current;
  setTimeout(function() {
    fiber.run();
  }, ms);
  Fiber.yield();
}

function catchError(f, onError) {
  return function () {
    var args = arguments;
    var run = function () {
      try {
        var ret = f.apply(null, args);
      }
      catch (e) {
        onError(e);
      }
      return ret;
    };
    if (Fiber.current) {
      return run();
    }
    else {
      return Fiber(run).run();
    }
  }
}

function processFile(callback) {
  var count, finished, onData, onException, onIgnoredEntry;
  count = 0;
  finished = false;
  onException = function (error) {
    if (finished) {
      console.error("Exception thrown after already finished:", error.stack || error);
    }
    if (finished) {
      return;
    }
    finished = true;
    return callback(error);
  };
  onData = function(data) {
    console.log("onData");
    if (finished) {
      return;
    }
    console.log("before sleep");
    sleepForMs(500);
    console.log("after sleep");
    throw new Error("test");
  };
  return fs.createReadStream('test.js').on('data', catchError(onData, onException)).on('end', function() {
    console.log("end");
    if (finished) {
      return;
    }
    finished = true;
    return callback(null, count);
  }).on('error', function(error) {
    console.log("error", error);
    if (finished) {
      return;
    }
    finished = true;
    return callback(error);
  });
};

Fiber(function () {
  console.log("Calling processFile");
  Future.wrap(processFile)().wait();
  console.log("processFile returned");
}).run();
console.log("back in main");

しかし、実際には機能しません。コールバック内のファイバーが終了する前に、データ コールバックが終了します。したがって、上記のコードは次のように出力します。

Calling processFile
back in main
onData
before sleep
end
processFile returned
after sleep
Exception thrown after already finished: Error: test

実際には、次のようなものにする必要があります。

Calling processFile
back in main
onData
before sleep
after sleep
end
processFile returned
Error: test
4

3 に答える 3

0

これは、wait.for (ファイバーのラッパー) を使用した実装ですhttps://github.com/luciotato/waitfor

この実装では、データ チャンクごとにファイバーが起動されるため、「n」個のタスクが並行して起動されます。ProcessFile は、すべてのファイバーが完了するまで「戻りません」。

これは、Fibers と wait.for を使用してこれを行う方法のデモですが、もちろん、これを運用環境で使用する前に、モジュール レベルの変数とすべての関数をクラスにカプセル化する必要があります。

var wait = require('wait.for');
var fs = require('fs');

var tasksLaunched=0;
var finalCallback;
var callbackDone=false;
var dataArr=[]

function sleepForMs(ms,sleepCallback) {
  setTimeout(function() {
    return sleepCallback();
  }, ms);
}

function resultReady(err,data){

    if (err){
      callbackDone = true;
      return finalCallback(err);
    }

    dataArr.push(data);
    if (dataArr.length>=tasksLaunched && !callbackDone) {
      callbackDone = true;
      return finalCallback(null,dataArr);
    }
}

function processChunk(data,callback) {
    var ms=Math.floor(Math.random()*1000);
    console.log('waiting',ms);
    wait.for(sleepForMs,ms);
    console.log(data.length,"chars");
    return callback(null,data.length);
}

function processFile(filename,callback) {
  var count, onData, onException, onIgnoredEntry;
  count = 0;
  finalCallback = callback;

  onException = function (error) {
    if (!callbackDone){
      callbackDone = true;
      return callback(error);
    }
  };

  onData = function(data) {
    console.log("onData");
    tasksLaunched++;
    wait.launchFiber(processChunk,data,resultReady);
  };

  fs.createReadStream(filename)
    .on('data', onData)
    .on('end', function() {
        console.log("end");
    })
    .on('error', function(error) {
        console.log("error", error);
        if (!callbackDone) {
            callbackDone = true;
            return callback(error);
          }
    });
};

function mainFiber() {
  console.log("Calling processFile");
  var data = wait.for(processFile,'/bin/bash');
  console.log(data.length,"results");
  console.log("processFile returned");
};

//MAIN
wait.launchFiber(mainFiber);
console.log("back in main");
于 2014-08-16T21:27:56.490 に答える