あなたはこれを行うことができます:
myudf.py
@outputSchema('bagofnums: {(num:int)}')
def merge_distinct(b1, b2):
out = []
for ignore, n in b1:
out.append(n)
for ignore, n in b2:
out.append(n)
return out
スクリプト.豚
register 'myudf.py' using jython as myudf ;
A = LOAD 'foo.in' USING PigStorage(' ') AS (num: int, link: int) ;
-- Essentially flips A
B = FOREACH A GENERATE link AS num, num AS link ;
-- We need to union the flipped A with A so that we will know:
-- 3 links to 7
-- 7 links to 3
-- Instead of just:
-- 3 links to 7
C = UNION A, B ;
-- C is in the form (num, link)
-- You can't do JOIN C BY link, C BY num ;
-- So, T just is C replicated
T = FOREACH D GENERATE * ;
D = JOIN C BY link, T BY num ;
E = FOREACH (FILTER E BY $0 != $3) GENERATE $0 AS num, $3 AS link_hopped ;
-- The output from E are (num, link) pairs where the link is one hop away. EG
-- 1 links to 2
-- 2 links to 4
-- 3 links to 7
-- The output will be:
-- 1 links to 4
F = COGROUP C BY num, E BY num ;
-- I use a UDF here to merge the bags together. Otherwise you will end
-- up with a bag for C (direct links) and E (links one hop away).
G = FOREACH F GENERATE group AS num, myudf.merge_distinct(C, E) ;
G
サンプル入力を使用するためのスキーマと出力:
G: {num: int,bagofnums: {(num: int)}}
(1,{(2),(4)})
(2,{(4),(1),(5)})
(3,{(7)})
(4,{(5),(2),(1)})
(5,{(4),(2)})
(7,{(3)})