5

HBase テーブルをマッピングして、HBase 行ごとに 1 つの RDD 要素を生成しています。ただし、行に不適切なデータが含まれている場合があり (解析コードで NullPointerException をスローする)、その場合はスキップしたいだけです。

最初のマッパーにを返させOptionて、0 または 1 要素を返すことを示し、次に をフィルタリングしてSome、含まれている値を取得します。

// myRDD is RDD[(ImmutableBytesWritable, Result)]
val output = myRDD.
  map( tuple => getData(tuple._2) ).
  filter( {case Some(y) => true; case None => false} ).
  map( _.get ).
  // ... more RDD operations with the good data

def getData(r: Result) = {
  val key = r.getRow
  var id = "(unk)"
  var x = -1L

  try {
    id = Bytes.toString(key, 0, 11)
    x = Long.MaxValue - Bytes.toLong(key, 11)
    // ... more code that might throw exceptions

    Some( ( id, ( List(x),
          // more stuff ...
        ) ) )
  } catch {
    case e: NullPointerException => {
      logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e)
      None
    }
  }
}

これを行うためのより短い慣用的な方法はありますか? getData()これは、map.filter.map私がやっているダンスでも、かなり乱雑に見えるように感じます.

おそらく aflatMapは機能する可能性があります ( a で 0 または 1 個のアイテムを生成しますSeq) が、 map 関数で作成しているタプルを平坦化したくありません。空を削除するだけです。

4

3 に答える 3

7

getDataaを返すように変更するとscala.util.Try、変換を大幅に簡素化できます。このようなものはうまくいくかもしれません:

def getData(r: Result) = {
  val key = r.getRow
  var id = "(unk)"
  var x = -1L

  val tr = util.Try{
    id = Bytes.toString(key, 0, 11)
    x = Long.MaxValue - Bytes.toLong(key, 11)
    // ... more code that might throw exceptions

    ( id, ( List(x)
          // more stuff ...
     ) )
  } 

  tr.failed.foreach(e => logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e))
  tr
}

次に、変換を次のように開始できます。

myRDD.
  flatMap(tuple => getData(tuple._2).toOption)

Tryが の場合、ビアFailureに変換され、ロジックの一部として削除されます。その時点で、変換の次のステップは、ラッピングなしで返される基になる型が何であれ、成功したケースでのみ機能します (つまり、 No ) 。NonetoOptionflatMapgetDataOption

于 2015-03-17T17:06:57.603 に答える
2

データを削除しても問題ない場合は、そのまま使用できますmapPartitions。以下にサンプルを示します。

import scala.util._
val mixedData = sc.parallelize(List(1,2,3,4,0))
mixedData.mapPartitions(x=>{
  val foo = for(y <- x)
   yield {
    Try(1/y)
  }
  for{goodVals <- foo.partition(_.isSuccess)._1}
   yield goodVals.get
})

悪い値を確認したい場合は、これまでとaccumulator同じように または を使用できます。

コードは次のようになります。

val output = myRDD.
  mapPartitions( tupleIter => getCleanData(tupleIter) )
  // ... more RDD operations with the good data

def getCleanData(iter: Iter[???]) = {
  val triedData = getDataInTry(iter)
  for{goodVals <- triedData.partition(_.isSuccess)._1}
    yield goodVals.get
}

def getDataInTry(iter: Iter[???]) = {
  for(r <- iter) yield {
    Try{
      val key = r._2.getRow
      var id = "(unk)"
      var x = -1L
      id = Bytes.toString(key, 0, 11)
      x = Long.MaxValue - Bytes.toLong(key, 11)
      // ... more code that might throw exceptions
    }
  }
}
于 2015-03-17T17:46:10.967 に答える