2

Hadoop用の並列scanLeft(結合演算子のプレフィックス合計を計算する)関数を作成したいと思います(特にスケーリング。これがどのように行われるかについては、以下を参照してください)。

hdfsファイル内の数列(1行に1つ)が与えられた場合、連続する偶数/奇数のペアの合計を使用して新しい数列を計算したいと思います。例えば:

入力シーケンス:

0、1、2、3、4、5、6、7、8、9、10

出力シーケンス:

0 + 1、2 + 3、4 + 5、6 + 7、8 + 9、10

すなわち

1,5,9,13,17,10

これを行うには、Hadoop用のInputFormatクラスとInputSplitsクラスを作成する必要があると思いますが、これを行う方法がわかりません。

こちらのセクション3.3をご覧ください。以下はScalaのアルゴリズムの例です。

 // for simplicity assume input length is a power of 2

def scanadd(input : IndexedSeq[Int]) : IndexedSeq[Int] =   
if (input.length == 1)
  input 
else { 
//calculate a new collapsed sequence which is the sum of sequential even/odd pairs 
val collapsed = IndexedSeq.tabulate(input.length/2)(i => input(2 * i) + input(2*i+1))

//recursively scan collapsed values
val scancollapse = scanadd(collapse)

//now we can use the scan of the collapsed seq to calculate the full sequence

val output = IndexedSeq.tabulate(input.length)(
i => i.evenOdd match {             

//if an index is even then we can just look into the collapsed sequence and get the value
// otherwise we can look just before it and add the value at the current index

   case Even => scancollapse(i/2) 
   case Odd => scancollapse((i-1)/2) + input(i)  
}

output
}

Hadoopとうまく連携するには、かなりの最適化が必要になる可能性があることを理解しています。これを直接翻訳すると、かなり非効率的なHadoopコードにつながると思います。たとえば、明らかにHadoopではIndexedSeqを使用できません。私はあなたが見る特定の問題をいただければ幸いです。とはいえ、おそらくうまく機能させることができると思います。

4

2 に答える 2

0

これは、InputFormatとRecordReaderを作成するために私が見つけた最高のチュートリアルでした。分割全体を1つのArrayWritableレコードとして読み取ることになりました。

于 2013-01-11T21:14:54.183 に答える
0

余計。このコードのことですか?

val vv = (0 to 1000000).grouped(2).toVector
vv.par.foldLeft((0L, 0L, false))((a, v) => 
    if (a._3) (a._1, a._2 + v.sum, !a._3) else (a._1 + v.sum, a._2, !a._3))
于 2013-01-05T01:30:22.843 に答える