3

私は非常に機能的な方法を書こうとしています。ストリーム処理の管理には Highland.js を使用していますが、私は初心者なので、この特殊な状況にどう対処すればよいか、かなり混乱していると思います。

ここでの問題は、ファイル ストリーム内のすべてのデータが一貫していないことです。通常、ファイルの最初の行はヘッダーです。これをメモリに保存し、後でストリーム内のすべての行を圧縮します。

これが私の最初の試みです:

var _      = require('highland');
var fs     = require('fs');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');

var headers = [];

var through = _.pipeline(
    _.split(),
    _.head(),
    _.doto(function(col) {
        headers = col.split(',');
        return headers;
    }),

    ......

    _.splitBy(','),
    _.zip(headers),
    _.wrapCallback(process)
);

_(stream)
    .pipe(through)
    .pipe(output);

パイプラインの最初のコマンドは、ファイルを行ごとに分割することです。次はヘッダーを取得し、doto はそれをグローバル変数として宣言します。問題は、ストリーム内の次の数行が存在しないため、プロセスがブロックされていることです...おそらく、その上に head() コマンドがあるためです。

他のいくつかのバリエーションを試しましたが、この例は、どこに行く必要があるかの感覚を与えてくれると思います.

これに関するガイダンスは役に立ちます。また、各行に異なる値がある場合、可変長/複雑さのさまざまなストリーム操作の中でプロセスストリームを分割するにはどうすればよいかという問題も生じます。

ありがとう。

編集:私はより良い結果を生み出しましたが、その効率に疑問を抱いています.これを最適化する方法はありますか? これはまだずさんな感じです。

var through = _.pipeline(
    _.split(),
    _.filter(function(row) {
        // Filter out bogus values
        if (! row || headers) {
            return true;
        }
        headers = row.split(',');
        return false;
    }),
    _.map(function(row) {
        return row.split(',')
    }),
    _.batch(500),
    _.compact(),
    _.map(function(row) {
        return JSON.stringify(row) + "\n";
    })
);

_(stream)
    .pipe(through)
4

1 に答える 1