5

dataerror、およびイベントをリッスンして処理するストリームがありend、関数を呼び出しdataて最初のストリームの各イベントを処理します。当然、データを処理する関数は他のコールバックを呼び出し、非同期にします。では、ストリーム内のデータが処理されているときに、さらにコードの実行を開始するにはどうすればよいでしょうか? ストリームでイベントをリッスンしても、非同期処理関数が終了したendことにはなりません。data

次のステートメントを実行するときに、ストリーム データ処理関数が確実に終了するようにするにはどうすればよいですか?

次に例を示します。

function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) {
  var self = this;
  var promises = [];
  accountStream
    .on('data', function (account) {
      migrateAccount.bind(self)(account, finishMigration);
    })
    .on('error', function (err) {
      return console.log(err);
    })
    .on('end', function () {
      console.log("Finished updating account stream (but finishMigration is still running!!!)");
      callThisOnlyAfterAllAccountsAreMigrated() // finishMigration is still running!
    });
}

var migrateAccount = function (oldAccount, callback) {
  executeSomeAction(oldAccount, function(err, newAccount) {
    if (err) return console.log("error received:", err);
    return callback(newAccount);
  });
}

var finishMigration = function (newAccount) {
  // some code that is executed asynchronously...
}

callThisOnlyAfterAllAccountsAreMigratedストリームが処理された後に呼び出されるようにするにはどうすればよいですか?

これは約束で行うことができますか?スルーストリームで実行できますか? 私は Nodejs を使用しているので、他の npm モジュールを参照すると役立つ場合があります。

4

3 に答える 3

2

endあなたが言ったように、ストリームでイベントをリッスンするだけでは役に立ちません。ストリームは、ハンドラー内のデータでユーザーが何をしているのかを認識していないか、気にしていないdataため、独自の migrateAccount 状態を追跡するコードを記述する必要があります。

私だったら、このセクション全体を書き直します。readableストリームでイベントを使用すると.read()、処理したいアイテムを一度にいくつでも読むことができます。それがあれば問題ありません。30歳ならスゴイ。これを行う理由は、ストリームからのデータで何をしていてもオーバーランしないようにするためです。現在のところ、accountStream が高速であれば、アプリケーションは間違いなくどこかでクラッシュします。

ストリームからアイテムを読み取って作業を開始するときは、戻ってきた約束を取り (Bluebird などを使用)、それを配列にスローします。promise が解決されたら、配列から削除します。.done()ストリームが終了したら、ハンドラーをアタッチします.all()(基本的に、まだ配列内にあるすべてのプロミスから 1 つの大きなプロミスを作成します)。

進行中のジョブの単純なカウンターを使用することもできます。

于 2015-05-26T18:50:30.100 に答える
1

プロミスを介してストリームを処理すると、はるかに簡単になります。

ここからコピーされた、 spexライブラリを使用する例:

var spex = require('spex')(Promise);
var fs = require('fs');

var rs = fs.createReadStream('values.txt');

function receiver(index, data, delay) {
    return new Promise(function (resolve) {
        console.log("RECEIVED:", index, data, delay);
        resolve(); // ok to read the next data;
    });
}

spex.stream.read(rs, receiver)
    .then(function (data) {
        // streaming successfully finished;
        console.log("DATA:", data);
    }, function (reason) {
        // streaming has failed;
        console.log("REASON:", reason);
    });
于 2015-10-20T00:42:02.587 に答える