3

2つの大きなファイルで標準の差分を実行できるようにしたいと思います。動作するものがありますが、コマンドラインでの差分ほど速くはありません。

A = load 'A' as (line);
B = load 'B' as (line);
JOINED = join A by line full outer, B by line;
DIFF = FILTER JOINED by A::line is null or B::line is null;
DIFF2 = FOREACH DIFF GENERATE (A::line is null?B::line : A::line), (A::line is null?'REMOVED':'ADDED');
STORE DIFF2 into 'diff';

誰かがこれを行うためのより良い方法を手に入れましたか?

4

1 に答える 1

4

私は次のアプローチを使用します。(私の JOIN アプローチは非常に似ていますが、この方法は複製された行で diff の動作を複製しません)。少し前に尋ねられたように、Pigはレデューサーの数を 0.8 に調整するアルゴリズムを取得したため、レデューサーを 1 つだけ使用していたのではないでしょうか?

  • 私が使用する両方のアプローチは、パフォーマンスが互いに数パーセント以内ですが、重複を同じようには扱いません
  • JOIN アプローチは重複を折りたたみます (したがって、1 つのファイルに他のファイルよりも多くの重複がある場合、このアプローチは重複を出力しません)
  • UNION アプローチは Unix diff(1) ツールのように機能し、正しいファイルに対して正しい数の余分な重複を返します。
  • Unix diff(1) ツールとは異なり、順序は重要ではありません (事実上、JOIN アプローチは実行さsort -u <foo.txt> | diffれ、UNION は実行されます)sort <foo> | diff)
  • 信じられないほどの (~ 数千) の重複行がある場合、結合のために速度が低下します (使用が許可されている場合は、最初に生データに対して DISTINCT を実行します)。
  • 行が非常に長い場合 (サイズが 1KB を超える場合など) は、DataFu MD5 UDF を使用し、ハッシュの違いのみを使用してから、元のファイルと JOIN して元の行を取得してから出力することをお勧めします。

結合の使用:

SET job.name 'Diff(1) Via Join'

-- Erase Outputs
rmf first_only
rmf second_only

-- Process Inputs
a = LOAD 'a.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS First: chararray;
b = LOAD 'b.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS Second: chararray;

-- Combine Data
combined = JOIN a BY First FULL OUTER, b BY Second;

-- Output Data
SPLIT combined INTO first_raw IF Second IS NULL,
                    second_raw IF First IS NULL;
first_only = FOREACH first_raw GENERATE First;
second_only = FOREACH second_raw GENERATE Second;
STORE first_only INTO 'first_only' USING PigStorage();
STORE second_only INTO 'second_only' USING PigStorage();

UNION の使用:

SET job.name 'Diff(1)'

-- Erase Outputs
rmf first_only
rmf second_only

-- Process Inputs
a_raw = LOAD 'a.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS Row: chararray;
b_raw = LOAD 'b.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS Row: chararray;

a_tagged = FOREACH a_raw GENERATE Row, (int)1 AS File;
b_tagged = FOREACH b_raw GENERATE Row, (int)2 AS File;

-- Combine Data
combined = UNION a_tagged, b_tagged;
c_group = GROUP combined BY Row;

-- Find Unique Lines
%declare NULL_BAG 'TOBAG(((chararray)\'place_holder\',(int)0))'

counts = FOREACH c_group {
             firsts = FILTER combined BY File == 1;
             seconds = FILTER combined BY File == 2;
             GENERATE
                FLATTEN(
                        (COUNT(firsts) - COUNT(seconds) == (long)0 ? $NULL_BAG :
                            (COUNT(firsts) - COUNT(seconds) > 0 ?
                                TOP((int)(COUNT(firsts) - COUNT(seconds)), 0, firsts) :
                                TOP((int)(COUNT(seconds) - COUNT(firsts)), 0, seconds))
                        )
                ) AS (Row, File); };

-- Output Data
SPLIT counts INTO first_only_raw IF File == 1,
                  second_only_raw IF File == 2;
first_only = FOREACH first_only_raw GENERATE Row;
second_only = FOREACH second_only_raw GENERATE Row;
STORE first_only INTO 'first_only' USING PigStorage();
STORE second_only INTO 'second_only' USING PigStorage();

パフォーマンス

  • 18 ノードの LZO 圧縮入力を使用して 200 GB (1,055,687,930 行) を超える差分を作成するには、約 10 分かかります。
  • 各アプローチは、Map/Reduce サイクルを 1 回だけ使用します。
  • これにより、ノードごと、1 分ごとに約 1.8GB の差分が得られます (スループットは高くありませんが、私のシステムではdiff(1) メモリ内でのみ動作するようですが、Hadoop はストリーミング ディスクを利用します。
于 2013-01-10T22:30:15.190 に答える