3

私は cassandra を使った経験があまりないので、間違ったアプローチをしていたらすみません。

map reduce を使用して cassandra で一括読み込みを実行しようとしています

基本的に単語数の例

参考: http: //henning.kropponline.de/2012/11/15/using-cassandra-hadoopbulkoutputformat/

簡単な Hadoop Wordcount Mapper の例を用意し、上記の例のようにドライバー コードとレデューサーを少し変更しました。

出力ファイルも正常に生成されました。今私の疑問は、カサンドラ部分へのロードを実行する方法ですか? 私のアプローチに違いはありますか?

ご意見をお聞かせください。

これはドライバーコードの一部です

 Job job = new Job();
 job.setJobName(getClass().getName());
 job.setJarByClass(CassaWordCountJob.class);

 Configuration conf = job.getConfiguration();
 conf.set("cassandra.output.keyspace", "test");
 conf.set("cassandra.output.columnfamily", "words");
 conf.set("cassandra.output.partitioner.class", "org.apache.cassandra.dht.RandomPartitioner");
 conf.set("cassandra.output.thrift.port","9160");    // default
 conf.set("cassandra.output.thrift.address", "localhost");
 conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", "400");

 job.setMapperClass(CassaWordCountMapper.class);
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(IntWritable.class);
 FileInputFormat.setInputPaths(job, new Path(args[0]));
 job.setReducerClass(CassaWordCountReducer.class);
 FileOutputFormat.setOutputPath(job, new Path("/home/user/Desktop/test/cassandra")); 
 MultipleOutputs.addNamedOutput(job, "reducer", BulkOutputFormat.class, ByteBuffer.class, List.class);
 return job.waitForCompletion(true) ? 0 : 1;

Mapper は、Word をトークン化して発行するだけの通常の wordcount マッパーと同じです。

レデューサークラスは次の形式です

public class CassaWordCountReducer extends 
        Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        List<Mutation> columnsToAdd = new ArrayList<Mutation>();
        Integer wordCount = 0;
        for(IntWritable value : values) {
            wordCount += value.get();
        }
        Column countCol = new Column(ByteBuffer.wrap("count".getBytes()));
        countCol.setValue(ByteBuffer.wrap(wordCount.toString().getBytes()));
        countCol.setTimestamp(new Date().getTime());
        ColumnOrSuperColumn wordCosc = new ColumnOrSuperColumn();
        wordCosc.setColumn(countCol);
        Mutation countMut = new Mutation();
        countMut.column_or_supercolumn = wordCosc;
        columnsToAdd.add(countMut);
        context.write(ByteBuffer.wrap(key.toString().getBytes()), columnsToAdd);
    }
}
4

1 に答える 1

3

Cassandra への一括読み込みを行うには、DataStax のこの記事を参照することをお勧めします。基本的に、一括読み込みには 2 つのことを行う必要があります。

  • 出力データはネイティブには Cassandra に適合しないため、SSTable に変換する必要があります。
  • SSTable を取得したら、それらを Cassandra にストリーミングできるようにする必要があります。もちろん、各SSTableをすべてのノードに単純にコピーするのではなく、データの関連部分のみを各ノードにコピーする必要があります。

を使用する場合、舞台裏でBulkOutputFormat使用しているため、すべてを行う必要があります。sstableloaderで使用したことはありませんMultipleOutputsが、正常に動作するはずです。

あなたの場合のエラーは、MultipleOutputs正しく使用していないことだと思います。context.write実際にオブジェクトに書き込む必要があるときに、まだ , を実行していMultipleOutputsます。通常の に書き込んでいるため、現在行っている方法では、 で定義したものではなく、Contextのデフォルトの出力形式で取得されます。レデューサーでを使用する方法の詳細については、こちらを参照してください。TextOutputFormatMultipleOutputsMultipleOutputs

定義したような正しい出力形式に書き込むと、BulkOutputFormatSSTable が作成され、クラスター内の各ノードから Cassandra にストリーミングされます。追加の手順は必要ありません。出力形式が自動的に処理します。

また、この投稿を見ることをお勧めします。ここでは の使用方法も説明されていますが、Cassandra エンドポイントをより簡単に構成するために見たいと思うかもしれない をBulkOutputFormat使用しています。ConfigHelper

于 2013-02-05T06:15:27.020 に答える