0

私は node.js から大きな価値を得ており、ストリーム処理モデルが大好きです。私は主に、データ エンリッチメントと ETL のようなジョブを使用したスト​​リーム処理に使用しています。

充実のために、こんな記録もあるかもしれません…

{ "ip":"123.45.789.01", "productId": 12345 }

おそらく製品の詳細を追加して、これを充実させたいと思います

{ "ip":"123.45.789.01", "productId": 12345, "description" : "Coca-Cola 12Pk", "price":4.00 }

説明のデータと価格のデータは、どちらも別々のストリームから取得されます。ハイランドでそのような依存関係にアプローチする最良の方法は何ですか?

H = require('highland')

descriptionStream = H(['[{"productId":1,"description":"Coca-Cola 12Pk"},{"productId":2,"description":"Coca-Cola 20oz Bottle"}]'])
  .flatMap(JSON.parse)

priceStream = H(['[{"productId":1,"price":4.00},{"productId":2,"price":1.25}]'])
  .flatMap(JSON.parse)

#  the file is a 10G file with a json record on each line
activityStream = H(fs.createReadStream('8-11-all.json',{flags:'r',encoding:'utf8'}))
  .splitBy("\n")
  .take(100000) # just take 100k for testing
  .filter((line)-> line.trim().length > 0) # to prevent barfing on empty lines
  .doto((v)->
    # here i want to add the decription from the descriptionStream
    # and i want to add the price from the price stream.
    # in order to do that, i need to make the execution of this
    # stream dependent on the completion of the first two and
    # availability of that data.  this is easy with declarative
    # programming but less intuitive with functional programming
  )
  .toArray((results)->
    # dump my results here
  )

何かご意見は?

4

2 に答える 2

0

highland.js を使用している場合は、.map各項目を変更するために関数を使用および指定できます。

例えば

var stream = _([{ "ip":"123.45.789.01", "productId": 12345 }]).map(function (x) {
   x.productName = 'Coca-Cola 12 Pack'
   return x;
});
于 2015-08-19T21:10:02.010 に答える