3

関数で、使用後に 2 つの DStream を返す方法はありfilterますか? たとえば、 a をフィルタリングすると、フィルタリングされたものDStreamは a に保存されDStream、フィルタリングされていないものは別の に保存されDStreamます。

4

1 に答える 1

4

ビルトインならもっと効率的にできるのですが、

def partition[A](stream: DStream[A])(pred: A => Boolean) {
  val stream1 = stream.map(x => (x, pred(x)).cache()
  val good = stream1.filter(_._2).map(_._1)
  val bad = stream1.filter(!_._2).map(_._1)
  (good, bad)
}

が1 回だけ計算されるように注意cache()する必要があります。stream1十分predに単純で、streamすでにキャッシュされている場合は、(stream.filter(pred), stream.filter(x => !pred(x)))より高速になるはずです。

于 2016-04-21T13:29:15.943 に答える