1

助けが必要で、これは非常に一般的なタスクのように思えます。さまざまなイベントを含む 1 時間ごとの巨大なログファイルがあります。ハードコーディングされた方法で、ハイブを使用してこれらのイベントを異なるファイルに分割しています。

from events
  insert overwrite table specificevent1
   where events.event_type='specificevent1'
  insert overwrite table specificevent2
   where events.event_type='specificevent2'
...;

追加する新しいイベントごとにコードを変更する必要があるため、これは問題です。

動的パーティショニングを使用して自動解析を実行しようとしていますが、問題が発生しています。

  1. 私のパーティション スキーマの場合/year/month/day/hour/event、1 日を超えるパーティションを回復することはできません。これは、1 か月の数が ~ (30 日) (24 時間) (100~ イベント)=~72k になり、処理するには多すぎるためです。
  2. 私のスキーマがevent/year/month/day/hourイベントが動的部分であるため、次のパーティションが動的としてスクリプト化されるため、パーティションの数が増えるにつれて分割に時間がかかります。

これを行うためのより良い方法はありますか (Hive および非 Hive ソリューション)?

4

1 に答える 1

0

これが他の人に役立つことを願っています...

ログファイルを多数の異なるファイル (event_type ごとのファイル) に分割する場合、Hive は適していないことがわかりました。Hive によって提供される動的パーティションには、IMHO の制限が多すぎます。

私がやったのは、カスタムのmap-reduce jarを書くことです。また、古い Hadoop インターフェイスは、generateFileNameForKeyValue() を実装できる MultipleTextOutputFormat 抽象クラスを提供するため、はるかに適していることもわかりました。(新しい Hadoop は、別の複数の出力ファイル メカニズムを提供します: MultipleOutputs は、事前定義された出力場所がある場合に最適ですが、キー値からその場でそれらを取得する方法を取得できませんでした)

コード例:

\*
Run example:
hadoop jar DynamicSplit.jar DynamicEventSplit.DynamicEventSplitMultifileMapReduce /event/US/incoming/2013-01-01-01/ event US 2013-01-01-01 2 "[a-zA-Z0-9_ ]+" "/event/dynamicsplit1/" ","
*/
package DynamicEventSplit;

import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapred.lib.*;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;

public class DynamicEventSplitMultifileMapReduce
{
        static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>  
        {
            private String event_name;
            private String EventNameRegexp;
            private int EventNameColumnNumber;
            private String columndelimeter=",";

            public void configure(JobConf job)
            {
                EventNameRegexp=job.get("EventNameRegexp");
                EventNameColumnNumber=Integer.parseInt(job.get("EventNameColumnNumber"));
                columndelimeter=job.get("columndelimeter");
            }
            public void map(LongWritable key, Text value,OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
            {
                //check that expected event_name field exists  
                String [] dall=value.toString().split(columndelimeter);
                if (dall.length<EventNameColumnNumber)
                {
                    return;
                }
                event_name=dall[EventNameColumnNumber-1];
                //check that expected event_name is valid  
                if (!event_name.matches(EventNameRegexp))
                {
                    return;
                }
                output.collect(new Text(dall[1]),value);
            }
        }

        static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> 
        {
            public void reduce(Text key, Iterator<Text> values,OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
            {
                    while (values.hasNext()) 
                    {
                        output.collect(key, values.next());
                    }
            }
        }


        static class MultiFileOutput extends MultipleTextOutputFormat<Text, Text> 
        {
            private String event_name;
            private String site;
            private String event_date;
            private String year;
            private String month;
            private String day;
            private String hour;
            private String basepath;


            public RecordWriter<Text,Text> getRecordWriter(FileSystem fs, JobConf job,String name, Progressable arg3) throws IOException
            {
                RecordWriter<Text,Text> rw=super.getRecordWriter(fs, job, name, arg3);
                site=job.get("site");
                event_date=job.get("date");
                year=event_date.substring(0,4);
                month=event_date.substring(5,7);
                day=event_date.substring(8,10);
                hour=event_date.substring(11,13);
                basepath=job.get("basepath");
                return rw;
            }

            protected String generateFileNameForKeyValue(Text key, Text value,String leaf) 
            {
                event_name=key.toString();
                return basepath+"event="+event_name+"/site="+site+"/year="+year+"/month="+month+"/day="+day+"/hour="+hour+"/"+leaf;
            }

            protected Text generateActualKey(Text key, Text value) 
            {
                return null;
            }
        }

        public static void main(String[] args) throws Exception 
        {
                String InputFiles=args[0];
                String OutputDir=args[1];
                String SiteStr=args[2];
                String DateStr=args[3];
                String EventNameColumnNumber=args[4];
                String EventNameRegexp=args[5];
                String basepath=args[6];
                String columndelimeter=args[7];

                Configuration mycon=new Configuration();
                JobConf conf = new JobConf(mycon,DynamicEventSplitMultifileMapReduce.class);
                conf.set("site",SiteStr);
                conf.set("date",DateStr);

                conf.setOutputKeyClass(Text.class);
                conf.setMapOutputKeyClass(Text.class);
                conf.setOutputValueClass(Text.class);

                conf.setMapperClass(Map.class);
                conf.setReducerClass(Reduce.class);

                conf.setInputFormat(TextInputFormat.class);
                conf.setOutputFormat(MultiFileOutput.class);

                conf.setMapSpeculativeExecution(false);
                conf.setReduceSpeculativeExecution(false);

                FileInputFormat.setInputPaths(conf,InputFiles);
                FileOutputFormat.setOutputPath(conf,new Path("/"+OutputDir+SiteStr+DateStr+"/"));

                conf.set("EventNameColumnNumber",EventNameColumnNumber);
                conf.set("EventNameRegexp",EventNameRegexp);
                conf.set("basepath",basepath);
                conf.set("columndelimeter",columndelimeter);

                JobClient.runJob(conf);
        }
}
于 2013-05-29T15:35:06.343 に答える