1

マッパー内からCassandra列ファミリーにアクセスするにはどうすればよいですか?具体的には、map()メソッドへの引数を私が期待するJavaタイプに戻すにはどうすればよいですか?

キー{logType}->{列名:timeUUID、列値:csvログ行、ttl:1年}


@ Chris&@rs_atlに感謝します

私はhadoopジョブを正常に実行しました。完全なコードは次のとおりです。

package com.xxx.hadoop;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.SortedMap;


import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.thrift.TBaseHelper;

import com.xxx.parser.LogParser;
import com.netflix.astyanax.serializers.StringSerializer;

public class LogTypeCounterByDate extends Configured implements Tool {
    private static final String KEYSPACE = "LogKS";
    private static final String COLUMN_FAMILY = "LogBlock";
    private static final String JOB_NAME = "LOG_LINE_COUNT";
    private static final String INPUT_PARTITIONER = "org.apache.cassandra.dht.RandomPartitioner";
    private static final String INPUT_RPC_PORT = "9160";
    private static final String INPUT_INITIAL_ADDRESS = "192.168.1.21";
    private static final String OUTPUT_PATH = "/logOutput/results";

    @Override
    public int run(String[] args) throws Exception {

        //Configuration conf = new Configuration();

        Job job = new Job(getConf(), JOB_NAME);
        job.setJarByClass(LogTypeCounterByDate.class);
        job.setMapperClass(LogTypeCounterByDateMapper.class);       
        job.setReducerClass(LogTypeCounterByDateReducer.class);

        job.setInputFormatClass(ColumnFamilyInputFormat.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setNumReduceTasks(1);
        ConfigHelper.setRangeBatchSize(getConf(), 1000);

        /*SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[0]), 
                ByteBuffer.wrap(new byte[0]), true, 1));*/
        SliceRange sliceRange = new SliceRange(ByteBuffer.wrap(new byte[0]), 
                ByteBuffer.wrap(new byte[0]), true, 1000);

        SlicePredicate slicePredicate = new SlicePredicate();
        slicePredicate.setSlice_range(sliceRange);


        ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
        ConfigHelper.setInputRpcPort(job.getConfiguration(), INPUT_RPC_PORT);
        ConfigHelper.setInputInitialAddress(job.getConfiguration(), INPUT_INITIAL_ADDRESS);
        ConfigHelper.setInputPartitioner(job.getConfiguration(), INPUT_PARTITIONER);
        ConfigHelper.setInputSlicePredicate(job.getConfiguration(), slicePredicate);

        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }

    public static void main(String[] args) throws Exception{
        ToolRunner.run(new Configuration(), new LogTypeCounterByDate(), args);
        System.exit(0);
    }


    public static class LogTypeCounterByDateMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, LongWritable>
    {

        @SuppressWarnings("rawtypes")
        @Override
        protected void setup(Mapper.Context context){

        }

        @SuppressWarnings({ })
        public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException{
            //String[] lines = columns.;
            String rowkey = StringSerializer.get().fromByteBuffer(TBaseHelper.rightSize(key));  
            Iterator<ByteBuffer> iter = columns.keySet().iterator();
            IColumn column;
            String line;
            LogParser lp = null;

            while(iter.hasNext()){
                column = columns.get(iter.next());
                line = StringSerializer.get().fromByteBuffer(TBaseHelper.rightSize(column.value()));
                lp = new LogParser(line);               
                context.write(new Text(rowkey + "\t" + "LineCount"), new LongWritable(1L));
                context.write(new Text(rowkey + "\t" + "Minutes"), new LongWritable(lp.getTotalDuration()));
            }
        }
    }

    public static class LogTypeCounterByDateReducer extends Reducer<Text, LongWritable, Text, LongWritable>
    {           

        public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException{
            long total = 0;
            for(LongWritable val : values){
                total += val.get();
            }
            context.write(key, new LongWritable(total));
        }
    }               
}

ConfigHelper.setRangeBatchSize(getConf(), 1000);

        /*SlicePredicate predicate = new   SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[0]), 
                ByteBuffer.wrap(new byte[0]), true, 1));*/
        SliceRange sliceRange = new SliceRange(ByteBuffer.wrap(new byte[0]), 
                ByteBuffer.wrap(new byte[0]), true, 1000);

        SlicePredicate slicePredicate = new SlicePredicate();
        slicePredicate.setSlice_range(sliceRange);

上記のコードは、各行のマッパーに1000列のみをフィードしますが、毎回1000列のバッチですべての行のすべての列をフィードします。

親切に誰かがこれで私を助けてくれます。

4

1 に答える 1

4

与えられたパラメータ:

ByteBuffer key;
SortedMap<ByteBuffer, IColumn> columns;

使用するもの:

String rowkey = StringSerializer.get().fromByteBuffer(TBaseHelper.rightSize(key))

デシリアライズされたキー値を取得します。ここでの前提は、行キーが文字列であることに注意してください。他のタイプの場合は、適切なシリアライザークラスを使用する必要があります。

列の値を取得するには、次のようにします。

Iterator<ByteBuffer> = columns.keySet().iterator(); 
while (iter.hasNext()) {
    IColumn col = columns.get(iter.next()); 
    xxx colVal = xxxSerializer.get().fromByteBuffer(TBaseHelper.rightSize(col.value()));
}

ここで、xxxは列値のJavaタイプであり、xxxSerializerは対応するシリアライザーです。

ちなみに、TBaseHelperクラスは、内部バイト配列の値のオフセットをゼロに修正するために使用され、シリアライザーの実装によって行われた仮定を強制します。

つまり、もう1つ...時系列を取得する場合、各列は独自の時系列値であり、反復内に適切なマッパーロジック(ある種の数学演算やコンテキストへの書き込みなど)を含める必要があります。列をループします。代わりに、より静的な列ファミリー(従来のSQLテーブルのようなもの)がある場合は、おそらく行全体のコンテキストへの単一の書き込みがあります。

于 2012-11-16T14:09:24.610 に答える