0

大量のウェブログデータがあります。セッション化する必要があります。また、セッションごとに前のドメインと次のドメインを生成する必要があります。AWSEMRのインタラクティブなジョブフローを介してテストしています。

現在、次のコードを使用してデータをセッション化することができます:http: //goo.gl/L52Wf。UDFのコンパイルと使用に慣れるのに少し手間がかかりましたが、私はそれまでにそれを達成しました。

入力ファイルのヘッダー行と最初の行(タブ区切り)は次のとおりです。

ID  Date    Rule code   Project UID respondent_uid  Type    Tab ID  URL domain  URL path    Duration    Exit cause  Details
11111111    2012-09-25T11:21:20.000Z    20120914_START_USTEST   20120914_TESTSITE_US_TR test6_EN_9  PAGE_VIEWED FF1348568479042 http://www.google.fr        11  OTHER   

これはリレーションからのタプルSESSIONSです(リレーションを取得する手順を以下に示します)。

(2012-09-27 04:42:20.000,11999603,20120914_URL_ALL,20120914_TESTSITE_US_TR,2082810875_US_9,PAGE_VIEWED,CH17,http://hotmail.com,_news/2012/09/26/14113684,28,WINDOW_DEACTIVATED,,3019222a-5c4d-4767-a82e-2b4df5d9db6d)

これは、テストデータをセッション化するために現在実行しているものとほぼ同じです。

register s3://TestBucket/Sessionize.jar

define Sessionize datafu.pig.sessions.Sessionize('30m');

A = load 's3://TestBucket/party2.gz' USING PigStorage() as (id: chararray, data_date: chararray, rule_code: chararray, project_uid: chararray, respondent_uid: chararray, type: chararray, tab_id: chararray, url_domain: chararray, url_path: chararray, duration: chararray, exit_cause: chararray, details: chararray);

B = foreach A generate $1, $0, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11;

C = filter B by id neq 'ID';

VIEWS = group C by (respondent_uid, url_domain);

SESSIONS = foreach VIEWS { VISITS = order C by data_date; generate FLATTEN(Sessionize(VISITS)) as (data_date: chararray, id: chararray, rule_code: chararray, project_uid: chararray, respondent_uid: chararray, type: chararray, tab_id: chararray, url_domain: chararray, url_path: chararray, duration: chararray, exit_cause: chararray, details: chararray, session_id); }

(Bでの手順は、日付を最初の位置に移動することです。Cでの手順は、ファイルヘッダーをフィルターで除外することです)

ここからこれを進める正しい方向まで迷っています。

豚のスクリプトとのSESSIONS関係を繰り返して、次のドメインと前のドメインを取得できますか?カスタムUDFを作成し、それに関係をforeach渡す方がよいでしょうか?SESSIONS(私自身のUDFを書くことは冒険になるでしょう!..)

アドバイスをいただければ幸いです。誰かがやるべきでないことを勧めることができとしても、同じように役立つかもしれないので、私はジャンクアプローチを研究する時間を無駄にしません。私はHadoopとpigスクリプトにまったく慣れていないので、これは間違いなく私の得意分野の1つではありません(まだ..)。

4

1 に答える 1

0

誰かが以下の解決策を改善できたとしても、私はまったく驚かないでしょうが、それは私の状況ではうまくいきます。以下のUDFを作成するためのリファレンスとして、sessionize UDF(私の質問で言及)を使用しました。

import java.io.IOException;
import java.util.ArrayList;
import org.apache.pig.Accumulator;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class PreviousNext extends EvalFunc<DataBag> implements Accumulator<DataBag>
{

    private DataBag outputBag;
    private String previous;
    private String next;

    public PreviousNext()
    {
        cleanup();
    }

    @Override
    public DataBag exec(Tuple input) throws IOException 
    {   
        accumulate(input);
        DataBag outputBag = getValue();
        cleanup();

        return outputBag;
    }

    @Override
    public void accumulate(Tuple input) throws IOException 
    {
        ArrayList<String> domains = new ArrayList<String>();

        DataBag d = (DataBag)input.get(0);

        //put all domains into ArrayList to allow for
        //accessing specific indexes
        for(Tuple t : d)
        {
            domains.add((String)t.get(2));
        }

        //add empty string for "next domain" value for last iteration
        domains.add("");

        int i = 0;

        previous = "";

        for(Tuple t : d)
        {   
            next = domains.get(i+1);

            Tuple t_new = TupleFactory.getInstance().newTuple(t.getAll());

            t_new.append(previous);
            t_new.append(next);

            outputBag.add(t_new);

            //current domain is previous for next iteration
            previous = domains.get(i);

            i++;
        }

    }

    @Override
    public void cleanup() 
    {
        this.outputBag = BagFactory.getInstance().newDefaultBag();

    }

    @Override
    public DataBag getValue() 
    {
        return outputBag;
    }


    @Override
    public Schema outputSchema(Schema input)
      {
        try 
        {
          Schema.FieldSchema inputFieldSchema = input.getField(0);

          if (inputFieldSchema.type != DataType.BAG)
          {
            throw new RuntimeException("Expected a BAG as input");
          }

          Schema inputBagSchema = inputFieldSchema.schema;

          if (inputBagSchema.getField(0).type != DataType.TUPLE)
          {
            throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s", DataType.findTypeName(inputBagSchema.getField(0).type)));
          }

          Schema inputTupleSchema = inputBagSchema.getField(0).schema;

          Schema outputTupleSchema = inputTupleSchema.clone();

          outputTupleSchema.add(new Schema.FieldSchema("previous_domain", DataType.CHARARRAY));

          outputTupleSchema.add(new Schema.FieldSchema("next_domain", DataType.CHARARRAY));

          return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),outputTupleSchema,DataType.BAG));
        }
        catch (CloneNotSupportedException e) 
        {
          throw new RuntimeException(e);
        }

        catch (FrontendException e) 
        {
          throw new RuntimeException(e);
        }
      }


}
于 2012-11-13T17:08:43.273 に答える