入力が終了したときに内部状態を出力する必要がある NodeJS ストリーム変換があります。
類推として、受信データを行に分割する変換を考えてみましょう。入力データが終了したら、残っている (改行で終了していない) データはすべて出力する必要があります。
これどうやってするの?
(私は _flush を試しましたがwrite(null)
、変換に入ったときに呼び出されません。)
アップデート:
var Transform = require('stream').Transform;
var util = require('util');
exports.createLinesTransform = createLinesTransform;
function createLinesTransform(options) {
return new LinesTransform(options);
}
function LinesTransform(options) {
options = options ? options : {};
options.objectMode = true;
Transform.call(this, options);
this._buf = '';
this._last_src = undefined;
}
util.inherits(LinesTransform, Transform);
LinesTransform.prototype._transform = function(chunk, encoding, done) {
console.log('chunk', chunk, '_buf', this._buf);
this._buf += chunk.payload;
for (var i = 0; i < this._buf.length; i++) {
if (this._buf.charAt(i) === '\n') {
this.push({src: chunk.src, payload: this._buf.slice(0, i)});
this._last_src = chunk.src;
this._buf = this._buf.slice(i + 1);
}
}
done();
}
// this doesn't get called when the input stream ends
LinesTransform.prototype._flush = function(done) {
console.log('_flush');
this.push({src: this._last_src, payload: this._buf});
done();
}
そしてテスト:
it('should make a working LinesTransform', function() {
var lines = createLinesTransform();
var rxd = [];
lines.on('data', function(data) {
console.log('data', data);
rxd.push(data);
});
var ret = lines.write({src:{},payload:'hel'});
assert.deepEqual([], rxd);
ret = lines.write({src:{},payload:'lo'});
assert.deepEqual([], rxd);
lines.write({src:{},payload:' world!\na second'});
assert.deepEqual([{"src":{},"payload":"hello world!"}], rxd);
lines.write({src:{},payload:'line\n'});
assert.deepEqual([{"src":{},"payload":"hello world!"},
{"src":{},"payload":"a secondline"}],
rxd);
lines.write({src:{},payload:'and some trailing data'});
assert.deepEqual([{"src":{},"payload":"hello world!"},
{"src":{},"payload":"a secondline"}],
rxd);
lines.write(null);
lines.end();
// this last assert fails
assert.deepEqual([{"src":{},"payload":"hello world!"},
{"src":{},"payload":"a secondline"},
{"src":{},"payload":"and some trailing data"}],
rxd);
});