0

Highland.js を使用してワークフローを設計しようとしています。Highland.js をどのように使用できるかわかりません。

以下のようなストリームベースのワークフロー(疑似コード)があります。

read                      //fs.createReadStream(...)
   .pipe(parse)           //JSONStream.parse(...)
   .pipe(filterDuplicate) //mongoClient.db.collection.count({}) > 0
   .pipe(transform)       //fn(item) { return tranform(item); }
   .pipe(write);          //mongoClient.db.collection.insert(doc)

filterDuplicate はデータベースを検索して、(条件を使用して) 読み取りレコードが存在するかどうかを確認し、ブール値の結果を返します。フィルターが機能するには、ストリームが完了するまで再利用したいアクティブな DB 接続が必要です。1 つの方法は、読み取りの前に接続を開き、書き込みの「終了」イベントで閉じることです。これは、両方のメソッドが同じデータベースを使用する場合に機能する、フィルターと書き込みのパラメーターとして接続を渡す必要があることを意味します。

上記のワークフローでは、filterDuplicate と write も異なるデータベースを使用する場合があります。したがって、接続が各機能内に含まれて管理されることを期待します。これにより、自己完結型の再利用可能なユニットになります。

Highland を使用してこれをどのように設計できるかについての情報を探しています。

ありがとう。

4

1 に答える 1

0

pipeたくさんの回数を使用するほど簡単ではありません。タスクに最も適した API メソッドを使用する必要があります。

これは、おそらく最終的に近いものになる大まかな例です。

read
  .through(JSONStream.parse([true]))
  .through((x) => {
    h((next, push) => { // use a generator for async operations
      h.wrapCallback( mongoCountQuery )( params ) // you don't have to do it this way
        .collect()
        .toCallback((err, result) => {
          if ( result > 0 ) push( err, x ); // if it met the criteria, hold onto it
          return push( null, h.nil ); // tell highland this stream is done
        });
    });
  })
  .merge() // because you've got a stream of streams after that `through`
  .map(transform) // just your standard map through a transform
  .through((x) => {
    h((next, push) => { // another generator for async operations
      h.wrapCallback( mongoUpdateQuery )( params )
        .toCallback((err, results) => {
          push( err, results );
          return push( null, h.nil );
        });
    });
  })
  .merge() // another stream-of-streams situation
  .toCallback( cb ); // call home to say we're done
于 2016-06-14T03:51:54.797 に答える