0

オブジェクトのストリームを取り、それらをmongodbデータベースに入力する書き込み可能なストリームをコーディングしようとしています。オブジェクトのストリームを消費する前に、最初にデータベース接続が確立されるのを待つ必要がありますが、プログラムが挿入部分に到達しないため、何か間違っているようです。

// ./mongowriter.js

let mongo = mongodb.MongoClient,
    connectToDb = _.wrapCallback(mongo.connect);

export default url => _.pipeline(s => {
  return connectToDb(url).flatMap(db => {
    console.log('Connection established!');
    return s.flatMap(x => /* insert x into db */);
  });
});

....

// Usage in other file
import mongowriter from './mongowriter.js';

let objStream = _([/* json objects */]);

objStream.pipe(mongoWriter);

プログラムは「接続が確立されました!」なしで終了します。コンソールに書き込まれます。

私は何が欠けていますか?私が従うべきイディオムはありますか?

4

1 に答える 1

0

ソースを読み、いくつかの一般的な実験を行うことで、単一の非同期処理を実行し、ストリームを介して処理を続行する方法を見つけました。基本的に、flatMap を使用して、非同期タスクからのイベントを実際に処理したいストリームに置き換えます。

私が期待していなかった別の癖は_.pipeline、元のストリームがコールバックで完全に消費されない限り機能しないことでした。そのため、単に _.map とログを入れるだけでは機能しません (これが私がデバッグしようとした方法です)。代わりに、最後にeachorがあることを確認する必要があります。done以下は最小限の例です。

export default _ => _.pipeline( stream => {
  return _(promiseReturningFunction())
    .tap(_ => process.stdout.write('.'))
    .flatMap(_ => stream)
    .each(_ => process.stdout.write('-'));
});

// Will produce something like the following when called with a non-empty stream.
// Note the lone '.' in the beginning.
// => .-------------------

基本的に、「.」非同期関数が完了すると出力され、ストリームのすべてのオブジェクトに対して「-」が出力されます。

うまくいけば、これで誰かの時間を節約できます。これを理解するのに恥ずかしいほど時間がかかりました。^^

于 2016-03-08T18:10:42.923 に答える