6

レコードIDの巨大な(> 1GB)CSVがあるとします。

655453
4930285
493029
4930301
493031
...

そして、それぞれについてid、REST API呼び出しを行って、レコードデータをフェッチし、ローカルで変換して、ローカルデータベースに挿入します。

Node.jsのReadableStreamでそれをどのように行いますか?

私の質問は基本的にこれです:どのようにして非常に大きなファイルを行ごとに読み取り、各行に対して非同期関数を実行し、[オプションで]特定の行からファイルの読み取りを開始できるようにしますか?

次のQuoraの質問から、私は使い方を学び始めていますfs.createReadStream

http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js

var fs = require('fs');
var lazy = require('lazy');

var stream = fs.createReadStream(path, {
  flags: 'r',
  encoding: 'utf-8'
});

new lazy(stream).lines.forEach(function(line) {
  var id = line.toString();
  // pause stream
  stream.pause();
  // make async API call...
  makeAPICall(id, function() {
    // then resume to process next id
    stream.resume();
  });
});

lazyただし、モジュールはファイル全体を(ストリームとして、ただし一時停止はありません)読み取るように強制するため、その擬似コードは機能しません。そのため、そのアプローチはうまくいかないようです。

もう1つは、特定の行からこのファイルの処理を開始できるようにしたいということです。これは、各処理id(API呼び出しの実行、データのクリーニングなど)がレコードごとに最大0.5秒かかる可能性があるため、毎回ファイルの先頭から開始する必要がないためです。私が使用することを考えている素朴なアプローチは、最後に処理されたIDの行番号をキャプチャして、それを保存することです。次に、ファイルを再度解析するときに、中断した行番号が見つかるまで、すべてのIDを1行ずつストリーミングしてから、makeAPICallビジネスを実行します。もう1つの単純なアプローチは、小さなファイル(たとえば、100個のID)を作成し、各ファイルを一度に1つずつ処理することです(IOストリームなしでメモリ内のすべてを実行するのに十分小さいデータセット)。これを行うためのより良い方法はありますか?

inには行の一部しか含まれていない可能性があるため( bufferSizeが小さい場合、各チャンクは10行になる可能性がありますが、可変長であるため、 9.5行か何か)。これが、上記の質問に対する最善のアプローチが何であるか疑問に思っている理由です。chunkstream.on('data', function(chunk) {});id

4

2 に答える 2

2

Andrew Андрей Листочкин の回答に関連:

bylineのようなモジュールを使用して、data行ごとに個別のイベントを取得できます。dataこれは、各チャンクのイベントを生成する元のファイル ストリームの変換ストリームです。これにより、各行の後に一時停止できます。

bylinelazyどうやらそうであるように、ファイル全体をメモリに読み込むことはありません。

var fs = require('fs');
var byline = require('byline');

var stream = fs.createReadStream('bigFile.txt');
stream.setEncoding('utf8');

// Comment out this line to see what the transform stream changes.
stream = byline.createStream(stream); 

// Write each line to the console with a delay.
stream.on('data', function(line) {
  // Pause until we're done processing this line.
  stream.pause();

  setTimeout(() => {
      console.log(line);

      // Resume processing.
      stream.resume();
  }, 200);
});
于 2017-11-06T04:44:48.117 に答える
1

を使用する必要はないと思いますnode-lazyNode docsで見つけたものは次のとおりです。

イベント:data

function (data) { }

dataイベントは、Buffer(デフォルトで) または使用されたstring場合 のいずれかを発行しますsetEncoding()

つまりsetEncoding()、ストリームで呼び出すと、dataイベント コールバックは文字列パラメーターを受け入れます。.pause()次に、このコールバック内で use メソッドとメソッドを呼び出すことができます.resume()

擬似コードは次のようになります。

stream.setEncoding('utf8');
stream.addListener('data', function (line) {
    // pause stream
    stream.pause();
    // make async API call...
    makeAPICall(line, function() {
        // then resume to process next line
        stream.resume();
    });
})

ドキュメントでは、ストリームが行ごとに読み取られることを明示的に指定していませんが、ファイルストリームの場合はそうであると想定しています。少なくとも他の言語やプラットフォームでは、テキスト ストリームはそのように機能し、ノード ストリームが異なる理由はないと思います。

于 2012-06-18T08:01:43.773 に答える