8

非常に大きなファイルを処理するために、読み取り可能な変換ストリームを使用しようとしています。私が直面しているように見える問題は、最後に書き込み可能なストリームを配置しないと、結果が返される前にプログラムが終了するように見えることです。

例 :rstream.pipe(split()).pipe(tstream)

tstreamには、カウンターがしきい値に達したときに放出するエミッターがあります。そのしきい値が低い数値に設定されている場合は結果が得られますが、高い場合は何も返されません。ファイルライターにパイプすると、常に結果が返されます。明らかな何かが欠けていますか?

コード:

// Dependencies
var fs = require('fs');
var rstream = fs.createReadStream('file');
var wstream = fs.createWriteStream('output');
var split = require('split'); // used for separating stream by new line
var QTransformStream = require('./transform');

var qtransformstream = new QTransformStream();
qtransformstream.on('completed', function(result) {
    console.log('Result: ' + result);
});
exports.getQ = function getQ(filename, callback) {

    // THIS WORKS if i have a low counter for qtransformstream, 
    // but when it's high, I do not get a result
    //   rstream.pipe(split()).pipe(qtransformstream);

    // this always works
    rstream.pipe(split()).pipe(qtransformstream).pipe(wstream);

};

ここにコードがありますQtransformstream

// Dependencies
var Transform = require('stream').Transform,
    util = require('util');
// Constructor, takes in the Quser as an input
var TransformStream = function(Quser) {
    // Create this as a Transform Stream
    Transform.call(this, {
        objectMode: true
    });
    // Default the Qbase to 32 as an assumption
    this.Qbase = 32;
    if (Quser) {
        this.Quser = Quser;
    } else {
        this.Quser = 20;
    }
    this.Qpass = this.Quser + this.Qbase;
    this.Counter = 0;
    // Variables used as intermediates
    this.Qmin = 120;
    this.Qmax = 0;
};
// Extend the transform object
util.inherits(TransformStream, Transform);
// The Transformation to get the Qbase and Qpass
TransformStream.prototype._transform = function(chunk, encoding, callback) {
    var Qmin = this.Qmin;
    var Qmax = this.Qmax;
    var Qbase = this.Qbase;
    var Quser = this.Quser;
    this.Counter++;
    // Stop the stream after 100 reads and emit the data
    if (this.Counter === 100) {
        this.emit('completed', this.Qbase, this.Quser);
    }
    // do some calcs on this.Qbase

    this.push('something not important');
    callback();
};
// export the object
module.exports = TransformStream;
4

3 に答える 3

8

編集:

また、カウンターがどれだけ高くなるかはわかりませんが、バッファーがいっぱいになると、変換ストリームにデータが渡されなくなります。この場合completed、カウンターの制限に達することがないため、実際にはヒットしません。を変更してみてくださいhighwatermark

EDIT 2: もう少し良い説明

よく知られているように、atransform stream は二重ストリームです。これは基本的に、ソースからデータを受け取り、宛先にデータを送信できることを意味します。これは一般に、それぞれ読み取りと書き込みと呼ばれます。はNode.js によって実装されたと のtransform stream両方から継承します。ただし、_read または _write 関数を実装する必要はありません。その意味では、あまり知られていないパススルー ストリームと考えることができます。read streamwrite streamtransform stream

transform streamが を実装しているという事実について考える場合write stream、書き込みストリームには常にその内容をダンプする宛先があるという事実についても考える必要があります。あなたが抱えている問題は、を作成するときにtransform stream、コンテンツを送信する場所を指定できないことです。 変換ストリームを介してデータを完全に渡す唯一の方法は、それを書き込みストリームにパイプすることです。そうしないと、本質的にストリームがバックアップされ、それ以上のデータを受け入れることができなくなります。これは、データの移動先がないためです。

これが、書き込みストリームにパイプするときに常に機能する理由です。書き込みストリームは、データを宛先に送信することでデータのバックアップを軽減しているため、すべてのデータがパイプされ、完了のイベントが発行されます。

サンプル サイズが小さいときにコードが書き込みストリームなしで機能する理由は、ストリームがいっぱいになっていないためです。そのため、変換ストリームは、完全なイベント/しきい値に達するのに十分なデータを受け入れることができます。しきい値が増加しても、別の場所 (書き込みストリーム) に送信せずにストリームが受け入れることができるデータの量は変わりません。これにより、ストリームがバックアップされ、データを受け入れることができなくなります。つまり、完了したイベントは発行されません。

あえて言いますhighwatermarkが、変換ストリームの値を上げれば、しきい値を上げてもコードは機能します。ただし、この方法は正しくありません。データを dev/null に送信する書き込みストリームにストリームをパイプします。その書き込みストリームを作成する方法は次のとおりです。

var writer = fs.createWriteStream('/dev/null');

バッファリングに関する Node.js ドキュメントのセクションでは、発生しているエラーについて説明しています。

于 2015-08-28T03:41:03.543 に答える
1

Transform ストリームではなくWritableを使用することをお勧めします。次に名前を変更_transformする_writeと、コードはストリームにパイプするとストリームを消費します。@Bradgnar が既に指摘したように、変換ストリームにはコンシューマーが必要です。そうしないと、読み取り可能なストリームがそれ以上のデータをバッファーにプッシュするのを停止します。

于 2015-09-03T17:48:11.040 に答える