0

入力が終了したときに内部状態を出力する必要がある NodeJS ストリーム変換があります。

類推として、受信データを行に分割する変換を考えてみましょう。入力データが終了したら、残っている (改行で終了していない) データはすべて出力する必要があります。

これどうやってするの?

(私は _flush を試しましたがwrite(null)、変換に入ったときに呼び出されません。)

アップデート:

var Transform = require('stream').Transform;
var util = require('util');

exports.createLinesTransform = createLinesTransform;

function createLinesTransform(options) {
  return new LinesTransform(options);
}

function LinesTransform(options) {
  options = options ? options : {};
  options.objectMode = true;
  Transform.call(this, options);
  this._buf = '';
  this._last_src = undefined;
}
util.inherits(LinesTransform, Transform);

LinesTransform.prototype._transform = function(chunk, encoding, done) {
  console.log('chunk', chunk, '_buf', this._buf);
  this._buf += chunk.payload;
  for (var i = 0; i < this._buf.length; i++) {
    if (this._buf.charAt(i) === '\n') {
      this.push({src: chunk.src, payload: this._buf.slice(0, i)});
      this._last_src = chunk.src;
      this._buf = this._buf.slice(i + 1);
    }
  }
  done();
}

// this doesn't get called when the input stream ends
LinesTransform.prototype._flush = function(done) {
  console.log('_flush');
  this.push({src: this._last_src, payload: this._buf});
  done();
}

そしてテスト:

  it('should make a working LinesTransform', function() {
    var lines = createLinesTransform();
    var rxd = [];
    lines.on('data', function(data) {
      console.log('data', data);
      rxd.push(data);
    });

    var ret = lines.write({src:{},payload:'hel'});
    assert.deepEqual([], rxd);
    ret = lines.write({src:{},payload:'lo'});
    assert.deepEqual([], rxd);
    lines.write({src:{},payload:' world!\na second'});
    assert.deepEqual([{"src":{},"payload":"hello world!"}], rxd);
    lines.write({src:{},payload:'line\n'});
    assert.deepEqual([{"src":{},"payload":"hello world!"},
                      {"src":{},"payload":"a secondline"}],
                     rxd);
    lines.write({src:{},payload:'and some trailing data'});
    assert.deepEqual([{"src":{},"payload":"hello world!"},
                      {"src":{},"payload":"a secondline"}],
                     rxd);
    lines.write(null);
    lines.end();
    // this last assert fails
    assert.deepEqual([{"src":{},"payload":"hello world!"},
                      {"src":{},"payload":"a secondline"},
                      {"src":{},"payload":"and some trailing data"}],
                     rxd);
  });
4

1 に答える 1