3

バックプレッシャーを適切に処理するストリームを実装する方法を理解することはできません。一時停止と再開を使用しないでください。

私は正しく動作するようにしようとしているこの実装を持っています:

var StreamPeeker = exports.StreamPeeker = function(myStream, callback) {
    stream.Readable.call(this, {highWaterMark: highWaterMark})
    this.stream = myStream

    myStream.on('readable', function() {
        var data = myStream.read(5000)
        //process.stdout.write("Eff: "+data)
        if(data !== null) {
            if(!this.push(data)) {
                process.stdout.write("Pause")
                this.pause()
            }
            callback(data)
        }
    }.bind(this))

    myStream.on('end', function() {
        this.push(null)
    }.bind(this))
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
    process.stdout.write("resume")
    //this.resume() // putting this in for some reason causes the stream to not output???
}

出力は正しく送信されますが、背圧が正しく生成されません。バックプレッシャーを適切にサポートするように変更するにはどうすればよいですか?

4

1 に答える 1

4

わかりました、試行錯誤の末にようやくそれを理解しました。いくつかのガイドライン:

  • 一時停止または再開を使用しないでください (そうしないと、従来の「フロー」モードになります)
  • 「データ」イベント リスナーを追加しないでください (そうしないと、従来の「フロー」モードになります)
  • ソースがいつ読み取り可能かを追跡するのは実装者の責任です
  • 宛先がより多くのデータを必要とする時期を追跡するのは実装者の責任です
  • _readメソッドが呼び出されるまで、実装はデータを読み取らないでください。
  • への引数readは、ソースにそのバイト数を与えるように指示します。おそらく、渡された引数をthis._readソースのreadメソッドに渡すのが最善です。このようにして、宛先で一度に読み取る量を構成できるようになり、残りのストリーム チェーンは自動化されます。

だからこれは私がそれを変更したものです:

更新:適切な背圧で実装するのがはるかに簡単で、ノードのネイティブ ストリームと同じくらいの柔軟性を持つ Readable を作成しました。

var Readable = stream.Readable
var util = require('util')

// an easier Readable stream interface to implement
// requires that subclasses:
    // implement a _readSource function that
        // * gets the same parameter as Readable._read (size)
        // * should return either data to write, or null if the source doesn't have more data yet
    // call 'sourceHasData(hasData)' when the source starts or stops having data available
    // calls 'end()' when the source is out of data (forever)
var Stream666 = {}
Stream666.Readable = function() {
    stream.Readable.apply(this, arguments)
    if(this._readSource === undefined) {
        throw new Error("You must define a _readSource function for an object implementing Stream666")
    }

    this._sourceHasData = false
    this._destinationWantsData = false
    this._size = undefined // can be set by _read
}
util.inherits(Stream666.Readable, stream.Readable)
Stream666.Readable.prototype._read = function(size) {
    this._destinationWantsData = true
    if(this._sourceHasData) {
        pushSourceData(this, size)
    } else {
        this._size = size
    }
}
Stream666.Readable.prototype.sourceHasData = function(_sourceHasData) {
    this._sourceHasData = _sourceHasData
    if(_sourceHasData && this._destinationWantsData) {
        pushSourceData(this, this._size)
    }
}
Stream666.Readable.prototype.end = function() {
    this.push(null)
}
function pushSourceData(stream666Readable, size) {
    var data = stream666Readable._readSource(size)
    if(data !== null) {
        if(!stream666Readable.push(data)) {
            stream666Readable._destinationWantsData = false
        }
    } else {
        stream666Readable._sourceHasData = false
    }
}    

// creates a stream that can view all the data in a stream and passes the data through
// correctly supports backpressure
// parameters:
    // stream - the stream to peek at
    // callback - called when there's data sent from the passed stream
var StreamPeeker = function(myStream, callback) {
    Stream666.Readable.call(this)
    this.stream = myStream
    this.callback = callback

    myStream.on('readable', function() {
        this.sourceHasData(true)
    }.bind(this))
    myStream.on('end', function() {
        this.end()
    }.bind(this))
}
util.inherits(StreamPeeker, Stream666.Readable)
StreamPeeker.prototype._readSource = function(size) {
    var data = this.stream.read(size)
    if(data !== null) {
        this.callback(data)
        return data
    } else {
        this.sourceHasData(false)
        return null
    }
}

古い答え:

// creates a stream that can view all the data in a stream and passes the data through
// correctly supports backpressure
// parameters:
    // stream - the stream to peek at
    // callback - called when there's data sent from the passed stream
var StreamPeeker = exports.StreamPeeker = function(myStream, callback) {
    stream.Readable.call(this)
    this.stream = myStream
    this.callback = callback
    this.reading = false
    this.sourceIsReadable = false

    myStream.on('readable', function() {
        this.sourceIsReadable = true
        this._readMoreData()
    }.bind(this))

    myStream.on('end', function() {
        this.push(null)
    }.bind(this))
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
    this.reading = true
    if(this.sourceIsReadable) {
        this._readMoreData()
    }
}
StreamPeeker.prototype._readMoreData = function() {
    if(!this.reading) return;

    var data = this.stream.read()
    if(data !== null) {
        if(!this.push(data)) {
            this.reading = false
        }
        this.callback(data)
    }
}
于 2015-03-28T00:06:10.623 に答える