私は非常に機能的な方法を書こうとしています。ストリーム処理の管理には Highland.js を使用していますが、私は初心者なので、この特殊な状況にどう対処すればよいか、かなり混乱していると思います。
ここでの問題は、ファイル ストリーム内のすべてのデータが一貫していないことです。通常、ファイルの最初の行はヘッダーです。これをメモリに保存し、後でストリーム内のすべての行を圧縮します。
これが私の最初の試みです:
var _ = require('highland');
var fs = require('fs');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');
var headers = [];
var through = _.pipeline(
_.split(),
_.head(),
_.doto(function(col) {
headers = col.split(',');
return headers;
}),
......
_.splitBy(','),
_.zip(headers),
_.wrapCallback(process)
);
_(stream)
.pipe(through)
.pipe(output);
パイプラインの最初のコマンドは、ファイルを行ごとに分割することです。次はヘッダーを取得し、doto はそれをグローバル変数として宣言します。問題は、ストリーム内の次の数行が存在しないため、プロセスがブロックされていることです...おそらく、その上に head() コマンドがあるためです。
他のいくつかのバリエーションを試しましたが、この例は、どこに行く必要があるかの感覚を与えてくれると思います.
これに関するガイダンスは役に立ちます。また、各行に異なる値がある場合、可変長/複雑さのさまざまなストリーム操作の中でプロセスストリームを分割するにはどうすればよいかという問題も生じます。
ありがとう。
編集:私はより良い結果を生み出しましたが、その効率に疑問を抱いています.これを最適化する方法はありますか? これはまだずさんな感じです。
var through = _.pipeline(
_.split(),
_.filter(function(row) {
// Filter out bogus values
if (! row || headers) {
return true;
}
headers = row.split(',');
return false;
}),
_.map(function(row) {
return row.split(',')
}),
_.batch(500),
_.compact(),
_.map(function(row) {
return JSON.stringify(row) + "\n";
})
);
_(stream)
.pipe(through)