3

Apache Flink では、1 つの主キーで 2 つのデータ セットを結合すると、各データ セットから対応するデータ セット エントリを含むタプル 2 が得られます。

問題は、map()結果のタプル 2 データ セットにメソッドを適用するときに、特に両方のデータ セットのエントリに多数の機能がある場合に、見栄えがよくないことです。

両方の入力データ セットでタプルを使用すると、次のようなコードが得られます。

var in1: DataSet[(Int, Int, Int, Int, Int)] = /* */
var in2: DataSet[(Int, Int, Int, Int)] = /* */

val out = in1.join(in2).where(0, 1, 2).equalTo(0, 1, 2)
  .map(join => (join._1._1, join._1._2, join._1._3,
                    join._1._4, join._1._5, join._2._4))

POJO やケース クラスを使用してもかまいませんが、これがどのように改善されるかはわかりません。

質問 1:そのタプル 2 を平坦化する良い方法はありますか? たとえば、別の演算子を使用します。

質問 2:同じキーで 3 つのデータ セットの結合を処理する方法は? サンプルソースがさらに厄介になります。

助けてくれてありがとう。

4

1 に答える 1

6

たとえば、結合された要素の各ペアに結合関数を直接適用できます。

val leftData: DataSet[(String, Int, Int)] = ...
val rightData: DataSet[(String, Int)] = ...
val joined: DataSet[(String, Int, Int)] = leftData
      .join(rightData).where(0).equalTo(0) { (l, r) => (l._1, l._2, l._3 + r._2) ) }

2 番目の質問に答えるために、Flink はバイナリ結合のみを処理します。しかし、Flink のオプティマイザは、関数の動作についてヒントを与えると、不要なシャッフルを回避できます。Forward Field アノテーションは、特定のフィールド (結合キーなど) が結合関数によって変更されていないことをオプティマイザーに伝え、既存のパーティション分割と並べ替えを再利用できるようにします。

于 2015-06-11T18:24:28.250 に答える