HBase テーブルをマッピングして、HBase 行ごとに 1 つの RDD 要素を生成しています。ただし、行に不適切なデータが含まれている場合があり (解析コードで NullPointerException をスローする)、その場合はスキップしたいだけです。
最初のマッパーにを返させOption
て、0 または 1 要素を返すことを示し、次に をフィルタリングしてSome
、含まれている値を取得します。
// myRDD is RDD[(ImmutableBytesWritable, Result)]
val output = myRDD.
map( tuple => getData(tuple._2) ).
filter( {case Some(y) => true; case None => false} ).
map( _.get ).
// ... more RDD operations with the good data
def getData(r: Result) = {
val key = r.getRow
var id = "(unk)"
var x = -1L
try {
id = Bytes.toString(key, 0, 11)
x = Long.MaxValue - Bytes.toLong(key, 11)
// ... more code that might throw exceptions
Some( ( id, ( List(x),
// more stuff ...
) ) )
} catch {
case e: NullPointerException => {
logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e)
None
}
}
}
これを行うためのより短い慣用的な方法はありますか? getData()
これは、map.filter.map
私がやっているダンスでも、かなり乱雑に見えるように感じます.
おそらく aflatMap
は機能する可能性があります ( a で 0 または 1 個のアイテムを生成しますSeq
) が、 map 関数で作成しているタプルを平坦化したくありません。空を削除するだけです。