これを行うには間違いなく mapreduce フレームワークに違反していますが、やらなければならないことをやらなければなりません!
まず、ソートはキーでのみ行われます。したがって、値が任意の順序になると想定する必要があります。したがって、XXID、Identifier、および TimeStamp をすべてまとめて取得する方法を理解する必要があります。(おそらく NullWriteable を値として使用できます)
3 つの項目をキーに合わせるには、WriteableComparableを実装して新しいデータ型を作成する必要があります。この新しいクラスに 3 つの値をラップさせ、それを と呼びましょうJavanxTriple
。
JavanxTriple
MapReduce の種類の項目をカスタマイズする方法は、.compareTo
関数をComparableから変更することです。最初に XXID を比較し、次に 1 または 2、次にタイムスタンプを比較するようにします。
次に、これらのそれぞれが個別のキーであるため、デフォルトでデータが異なるレデューサーに送られるという問題を解決する必要があります。そのままでは、必要なデータ ストリームを計算することはできません。この問題を回避するには、カスタムパーティショナーを作成する必要があります。パーティショナーは、各レコードがどのレデューサーに移動するかを通知します。これを行うには、オーバーライドします.getPartition
。を計算するときは.getPartition
、XXID のみを使用してこの番号を決定します (キーの識別子とタイムスタンプの部分ではありません)。つまり、同じ XXID を持つすべてのアイテムが同じレデューサーに送信されます。
最後に、レデューサーを実装する方法が一般的ではないという問題があります。reduce はキーごとに 1 回だけ呼び出され、渡される Iterable には NullWriteable のみが含まれます。
これを回避するには、Reducer クラスでいくつかの静的変数を使用して、reduce 関数で何が起こっているかを追跡します。XXID がいつ変化したかを検出して、次の分析に切り替える必要があります。セットアップとクリーンアップのメソッドを使用して、物事をセットアップして仕上げる必要がある場合があります。