6

したがって、入力データには id1 と id2 の 2 つのフィールド/列があり、コードは次のとおりです。

TextLine(args("input"))
.read
.mapTo('line->('id1,'id2)) {line: String =>
    val fields = line.split("\t")
        (fields(0),fields(1))
}
.groupBy('id2){.size}
.write(Tsv(args("output")))

出力結果は、(私が想定する) 2 つのフィールドになります: id2 * サイズ。id2 とグループ化された id1 の値を保持し、それを別のフィールドとして追加することが可能かどうかを調べることに少し行き詰まっていますか?

4

1 に答える 1

8

あなたはこれを良い方法で行うことはできません。内部でどのように機能するかを考えてみてください。カウントされるデータをチャンクに分割し、異なるプロセスに送信します。各プロセスはチャンクをカウントし、最後に単一のレデューサーがそれらをすべて追加します。各プロセスがカウントしている間、全体のサイズがわからないため、フィールドを追加できません。唯一の方法は、全体のサイズがわかっている場合 (つまり、結合) に戻ってデータに追加することです。

各グループがメモリに収まる場合 (およびメモリを構成できる場合)、次のことができます。

Tsv(args("input"), ('id1, 'id2))
.groupBy('id2)(_.size.toList[(String, String)](('id1, 'id2) -> 'list))
.flatMapTo[(Iterable[(String, String)], Int), (String, String, Int)](('list, 'size) -> ('id1, 'id2, 'size)) {
  case (list, size) => list.map(record => (record._1, record._2, size))
}
.write(Tsv(args("output")))

ただし、システムに十分なメモリがない場合は、高価な結合を使用する必要があります。

備考: TextLine の代わりに Tsv を使用し、その後に mapTo と分割を続けることができます。

于 2013-09-09T14:06:36.830 に答える