4

歪んだデータ セットがあり、group by 操作を実行してから、ネストされた foreach を実行する必要があります。データが歪んでいるため、時間がかかるレデューサーはほとんどなく、時間がかからないレデューサーもあります。歪んだ結合が存在することは知っていますが、group by と foreach には何がありますか? これが私の豚のコードです(変数の名前を変更しました):

foo_grouped = GROUP foo_grouped by FOO;
FOO_stats = FOREACH foo_grouped 
{ 
a_FOO_total = foo_grouped.ATTR; 
a_FOO_total = DISTINCT a_FOO_total; 

bar_count = foo_grouped.BAR; 
bar_count = DISTINCT bar_count; 

a_FOO_type1 = FILTER foo_grouped by COND1=='Y';
a_FOO_type1 = a_FOO_type1.ATTR; 
a_FOO_type1 = DISTINCT a_FOO_type1;

a_FOO_type2 = FILTER foo_grouped by COND2=='Y' OR COND3=='HIGH'; 
a_FOO_type2 = a_FOO_type2.ATTR; 
a_FOO_type2 = DISTINCT a_FOO_type2; 

generate group as FOO, 
COUNT(a_FOO_total) as a_FOO_total, COUNT(a_FOO_type1) as a_FOO_type1, COUNT(a_FOO_type2)     as a_FOO_type2, COUNT(bar_count) as bar_count; }
4

2 に答える 2

9

あなたの例では、レデューサーで実行される FOREACH 内にネストされた DISTINCT 演算子が多数あり、RAM に依存して一意の値を計算し、このクエリは 1 つのジョブのみを生成します。グループ内の一意の要素が多すぎる場合は、メモリ関連の例外も発生する可能性があります。

幸いなことに、PIG Latin はデータフロー言語であり、一種の実行計画を記述します。より多くの CPU を利用するために、並行して実行できる MapReduce ジョブをより多く強制するようにコードを変更できます。そのためには、ネストされた DISTINCT を使用せずにクエリを書き直す必要があります。秘訣は、個別の操作を行い、列が 1 つしかないかのようにグループ化し、結果をマージすることです。これは非常に SQL に似ていますが、機能します。ここにあります:

records = LOAD '....' USING PigStorage(',') AS (g, a, b, c, d, fd, s, w);
selected = FOREACH records GENERATE g, a, b, c, d;
grouped_a = FOREACH selected GENERATE g, a;
grouped_a = DISTINCT grouped_a;
grouped_a_count = GROUP grouped_a BY g;
grouped_a_count = FOREACH grouped_a_count GENERATE FLATTEN(group) as g, COUNT(grouped_a) as a_count;

grouped_b = FOREACH selected GENERATE g, b;
grouped_b = DISTINCT grouped_b;
grouped_b_count = GROUP grouped_b BY g;
grouped_b_count = FOREACH grouped_b_count GENERATE FLATTEN(group) as g, COUNT(grouped_b) as b_count;

grouped_c = FOREACH selected GENERATE g, c;
grouped_c = DISTINCT grouped_c;
grouped_c_count = GROUP grouped_c BY g;
grouped_c_count = FOREACH grouped_c_count GENERATE FLATTEN(group) as g, COUNT(grouped_c) as c_count;

grouped_d = FOREACH selected GENERATE g, d;
grouped_d = DISTINCT grouped_d;
grouped_d_count = GROUP grouped_d BY g;
grouped_d_count = FOREACH grouped_d_count GENERATE FLATTEN(group) as g, COUNT(grouped_d) as d_count;

mrg = JOIN grouped_a_count BY g, grouped_b_count BY g, grouped_c_count BY g, grouped_d_count BY g;
out = FOREACH mrg GENERATE grouped_a_count::g, grouped_a_count::a_count, grouped_b_count::b_count, grouped_c_count::c_count, grouped_d_count::d_count;
STORE out into '....' USING PigStorage(',');

実行後、最初のジョブによって処理されたデータのスキューが個別の操作に影響を与えなかったことを示す次の要約を取得しました。

Job Stats (time in seconds):
      JobId            Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime      MaxReduceTime   MinReduceTime   AvgReduceTime   Alias   Feature Outputs
job_201206061712_0244   669     45      75      8       13      376     18      202     grouped_a,grouped_b,grouped_c,grouped_d,records,selected        DISTINCT,MULTI_QUERY
job_201206061712_0245   1       1       3       3       3       12      12      12      grouped_c_count GROUP_BY,COMBINER
job_201206061712_0246   1       1       3       3       3       12      12      12      grouped_b_count GROUP_BY,COMBINER
job_201206061712_0247   5       1       48      27      33      30      30      30      grouped_a_count GROUP_BY,COMBINER
job_201206061712_0248   1       1       3       3       3       12      12      12      grouped_d_count GROUP_BY,COMBINER
job_201206061712_0249   4       1       3       3       3       12      12      12      mrg,out HASH_JOIN       ...,
Input(s):
Successfully read 52215768 records (44863559501 bytes) from: "...."

Output(s):
Successfully stored 9 records (181 bytes) in: "..."

Job DAG から、groupby 操作が並行して実行されたことがわかります。

Job DAG:
job_201206061712_0244   ->      job_201206061712_0248,job_201206061712_0246,job_201206061712_0247,job_201206061712_0245,
job_201206061712_0248   ->      job_201206061712_0249,
job_201206061712_0246   ->      job_201206061712_0249,
job_201206061712_0247   ->      job_201206061712_0249,
job_201206061712_0245   ->      job_201206061712_0249,
job_201206061712_0249

グループ キー値の 1 つ (列 g) がデータの 95% を占めるデータセットでは問題なく動作します。また、メモリ関連の例外も取り除きます。

于 2012-06-29T16:50:38.090 に答える
0

最近、この結合でエラーが発生しました。グループに null がある場合、リレーション全体が削除されます。

于 2012-09-28T04:10:30.460 に答える