Highland.js を使用してワークフローを設計しようとしています。Highland.js をどのように使用できるかわかりません。
以下のようなストリームベースのワークフロー(疑似コード)があります。
read //fs.createReadStream(...)
.pipe(parse) //JSONStream.parse(...)
.pipe(filterDuplicate) //mongoClient.db.collection.count({}) > 0
.pipe(transform) //fn(item) { return tranform(item); }
.pipe(write); //mongoClient.db.collection.insert(doc)
filterDuplicate はデータベースを検索して、(条件を使用して) 読み取りレコードが存在するかどうかを確認し、ブール値の結果を返します。フィルターが機能するには、ストリームが完了するまで再利用したいアクティブな DB 接続が必要です。1 つの方法は、読み取りの前に接続を開き、書き込みの「終了」イベントで閉じることです。これは、両方のメソッドが同じデータベースを使用する場合に機能する、フィルターと書き込みのパラメーターとして接続を渡す必要があることを意味します。
上記のワークフローでは、filterDuplicate と write も異なるデータベースを使用する場合があります。したがって、接続が各機能内に含まれて管理されることを期待します。これにより、自己完結型の再利用可能なユニットになります。
Highland を使用してこれをどのように設計できるかについての情報を探しています。
ありがとう。