1

mongo からのストリーミング クエリがあり、それを through2 の「スパイ」書き込み可能ストリームにパイプしています。5 つのドキュメントの小さなコレクションを含む「終了」コールバックを含め、完全に機能します。ただし、344 個のドキュメントのより大きなコレクションでは、最初の 15 個のみが通過し、その後永久にハングアップし、「終了」イベントは発生しません。MCVE は次のとおりです。

var spy = require("through2-spy").obj;
var MongoClient = require("mongodb").MongoClient;

function getStream() {
  var stream = spy(function() {
    console.log("@bug counting", stream.total++);
  });
  stream.total = 0;
  return stream;
}

function onEnd() {
  console.log("ended");
}

MongoClient.connect(process.argv[2], function(error, db) {
  if (error) {
    console.error(error);
    return;
  }
  var stream = db.collection(process.argv[3]).find().stream();
  stream
    // behavior is the same with the follow line commented out or not
    .on("end", db.close.bind(db))
    .on("error", console.error)
    .on("end", onEnd)
    .pipe(getStream());
});
4

2 に答える 2

1

問題は、through2-spyデフォルトhighWaterMarkで 16 を使用することにあります。フロー制御を処理するために、ストリームは、データがストリームから消費されるときにクリアされる内部バッファを維持します。によって返された変換ストリームからのデータを消費する読み取り可能なストリームがないためgetStream、内部バッファがいっぱいになり、highWaterMark. を増やすとhighWaterMark修正されるはずです:

var stream = spy({highWaterMark: 350}, function() {
  console.log("@bug counting", stream.total++);
});

別の非標準の代替手段は、変換ストリームの読み取り可能な状態をリセットすることです。

var stream = spy(function() {
    console.log("@bug counting", stream.total++);
    this._readableState.length = 0;
});
于 2015-03-22T16:19:36.900 に答える
0

これを解決する別の方法は、上流のソースを完全に読み取って完了する何かを下流に確保することです。ストリームの最後に余分なものを追加すること.pipe(terminus.devnull({objectMode: true});になり、それもうまくいきました.

var MongoClient = require("mongodb").MongoClient;
var spy = require("through2-spy").obj;
var terminus = require("terminus");

function getStream() {
  var stream = spy(function() {
    console.log("@bug counting", stream.total++);
  });
  stream.total = 0;
  return stream;
}

function onEnd() {
  console.log("ended");
}

MongoClient.connect(process.argv[2], function(error, db) {
  if (error) {
    console.error(error);
    return;
  }
  var stream = db.collection(process.argv[3]).find().stream();
  stream
    // behavior is the same with the follow line commented out or not
    .on("end", db.close.bind(db))
    .on("error", console.error)
    .on("end", onEnd)
    .pipe(getStream())
    .pipe(terminus.devnull({objectMode: true}));
});
于 2015-03-23T01:24:34.747 に答える