0

私は PIG の速度を上げており、web_log データと 2 つのソースからの株価履歴を組み合わせています。日付/時刻はタイムスタンプに正規化され、銘柄記号で結合が実行されます。タイムスタンプが一致しません。

jnd = JOIN web_time BY w_sym, stock_sort BY group;

このグループには、銘柄固有の株式データのバッグが含まれています。組み合わせたスキーマを次に示します。

jnd: {web_time::ip: chararray,web_time::user: chararray,web_time::w_time: long,web_time::url: chararray,stock_sort::sort: {(sym: chararray,time: long,price: double) }}

web_time::w_time と time を使用して stock_sort バッグをフィルタリングする必要がありますが、完全に一致していません。サンプル JND データは次のようになります。

(14.192.253.226 voriesszing 1213201721000 " get /vlccf.html http /1.0"、{(vlccf、1265361975000、13.84)、(vlccf、12652625625600、14.16)、14.16 14.48)、(VLCCF、1265028034000、14.5)、(VLCCF、1262678148000、13.76)、(VLCCF、1262607761000、13.82(VLCCF、1233832497000、169000、169000、169000、169000、169000、vlccf、1233740569000 ,23.99),(VLCCF, 883720431000 ,23.57)})

$2 の値を使用して、最終的には 1 つのエントリを除くすべてのエントリをフィルタリングする必要がありますが、今のところ、タイムスタンプが小さいタプルを削除しようとしています。

flake = FOREACH jnd {
    fits = FILTER jnd BY (w_time > time);
    GENERATE ip, user, w_time, url, fits;
    }

上記は機能しません。ステップ 1 で、目的の時間 (w_time) よりも小さいタイムスタンプを持つすべての Bag タプルを削除します。w_time はグループの一部ではありません。これには本当にUDFが必要ですか、それとも単純なものが欠けていますか? 私は立ち往生しています。

開発環境

Apache Pig バージョン 0.15.0.2.4.0.0-169 (再エクスポート) 2016 年 2 月 10 日 07:50:04 にコンパイル Hadoop 2.7.1.2.4.0.0-169 4 ノードの Hortonworks クラスター

どんな入力でも大歓迎です。

4

1 に答える 1

0

foreach で、stock_sort::sort をフィルタリングする必要があると思います。JNDではありません。また、フィルタリングは jnd.w_time > time で行う必要があります。なんとかフロー全体を書きました。UDF はありません。下記参照。

次の 2 つのファイルを取得しました。

xact.txt:

VLCCF,1265361975000,13.84
VLCCF,1265262560000,14.16
VLCCF,1265192740000,14.44
VLCCF,1265099390000,14.48
VLCCF,1265028034000,14.5
VLCCF,1262678148000,13.76
VLCCF,1262607761000,13.82
VLCCF,1233832497000,16.9
VLCCF,1233740569000,16.96
VLCCF,884004754000,23.99
VLCCF,883720431000,23.5

ストック.txt

14.192.253.226,voraciouszing,1213201721000,"GET /VLCCF.html HTTP/1.0",VLCCF

stock = load 'stock.txt' using PigStorage(',') as (
ip:chararray,
user:chararray,
w_time:long,
url:chararray,
symbol:chararray
);

xact = load 'xact.txt' using PigStorage(',') as (
symbol:chararray,
time:long,
price:double
);

xact_grouped = foreach(group xact by symbol) generate
    group, xact;

joined = join stock by symbol, xact_grouped by group;

filtered = foreach joined {
    grp = filter xact by time < joined.w_time;
    generate ip, grp;
};

dump filtered;

私にくれた

(14.192.253.226,{(VLCCF,884004754000,23.99),(VLCCF,883720431000,23.5)})

編集済み:代わりに

stock = load 'stock.txt' using PigStorage(',') as (
ip:chararray,
user:chararray,
w_time:long,
url:chararray,
symbol:chararray
);

xact = load 'xact.txt' using PigStorage(',') as (
symbol:chararray,
time:long,
price:double
);

joined = join stock by symbol, xact by symbol;

joined_filtered = foreach (filter joined by time < w_time) generate
    ip as ip,
    user as user,
    w_time as w_time,
    stock::symbol as symbol,
    time as time,
    price as price;

grouped = foreach (group joined_filtered by (ip, user, w_time)) generate
    flatten(group),
    joined_filtered;
于 2016-04-27T11:53:34.300 に答える