2

Java クライアント 'HECTOR' を使用して、Cassandra に保存されているデータに対して単純な map-reduce ジョブを実行しようとしています。

この美しいブログ投稿で説明されている hadoop-wordcount の例は、既に正常に実行されています。Hadoop サポートの記事も読みました。

しかし、私がやりたいことは、実装に関しては少し異なります (wordcount の例では、mapreduce-site.xml に言及しているスクリプトを使用しています)。Cassandra データで「HECTOR」からローカルではなく、分散モードで map-reduce ジョブを実行する方法を誰かに理解してもらいたいです。

私のコードは、ローカル モードで map-reduce ジョブを正常に実行します。しかし、私が望むのは、それらを分散モードで実行し、結果を新しい ColumnFamily として cassandra キースペースに書き込むことです。

分散モードで実行するには、これをどこかに設定する必要があるかもしれませんが (上記のブログ投稿で述べたように)、どこにあるのか
$PATH_TO_HADOOP/conf/mapred-site.xml
わかりません。

これが私のコードです

public  class test_forum implements Tool {

private String KEYSPACE = "test_forum";
private String COLUMN_FAMILY ="posts";
private String OUTPUT_COLUMN_FAMILY = "output_post_count";
private static String CONF_COLUMN_NAME = "text";


public int run(String[] strings) throws Exception {

    Configuration conf = new Configuration();

    conf.set(CONF_COLUMN_NAME, "text");
    Job job = new Job(conf,"test_forum");

    job.setJarByClass(test_forum.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setReducerClass(ReducerToCassandra.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(ByteBuffer.class);
    job.setOutputValueClass(List.class);

    job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
    job.setInputFormatClass(ColumnFamilyInputFormat.class);


    System.out.println("Job Set");


    ConfigHelper.setRpcPort(job.getConfiguration(), "9160");
    ConfigHelper.setInitialAddress(job.getConfiguration(), "localhost");
    ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");

    ConfigHelper.setInputColumnFamily(job.getConfiguration(),KEYSPACE,COLUMN_FAMILY);
    ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);

    SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes("text")));

    ConfigHelper.setInputSlicePredicate(job.getConfiguration(),predicate);

    System.out.println("running job now..");

    boolean success = job.waitForCompletion(true);

    return success ? 0:1;  //To change body of implemented methods use File | Settings | File Templates.

}



public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable>
{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    private ByteBuffer sourceColumn;
    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
    throws IOException, InterruptedException
    {
        sourceColumn = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME));
    }

    public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException
    {



        IColumn column = columns.get(sourceColumn);

        if (column == null)  {
            return;
        }

        String value = ByteBufferUtil.string(column.value());
        System.out.println("read " + key + ":" + value + " from " + context.getInputSplit());

        StringTokenizer itr = new StringTokenizer(value);

        while (itr.hasMoreTokens())
        {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }


}

    public static class ReducerToCassandra extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>
{
    private ByteBuffer outputKey;

    public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
    {
        int sum = 0;

        byte[] keyBytes = word.getBytes();
        outputKey = ByteBuffer.wrap(Arrays.copyOf(keyBytes, keyBytes.length));


        for (IntWritable val : values)
            sum += val.get();

        System.out.println(word.toString()+" -> "+sum);
        context.write(outputKey, Collections.singletonList(getMutation(word, sum)));

    }

    private static Mutation getMutation(Text word, int sum)
    {
        Column c = new Column();
        c.setName(Arrays.copyOf(word.getBytes(), word.getLength()));
        c.setValue(ByteBufferUtil.bytes(String.valueOf(sum)));
        c.setTimestamp(System.currentTimeMillis());

        Mutation m = new Mutation();
        m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
        m.column_or_supercolumn.setColumn(c);
        System.out.println("Mutating");
        return m;

    }

}




public static void main(String[] args) throws Exception, ClassNotFoundException, InterruptedException {

    System.out.println("Working..!");

    int ret=ToolRunner.run(new Configuration(), new test_forum(), args);

    System.out.println("Done..!");

    System.exit(ret);

}

}

ここに私が得る警告があります:

WARN  - JobClient                  - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
WARN  - JobClient                  - No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).

しかし、コードは map-reduce タスクを実行して正常に実行されますが、どこにデータを書き込むのかわかりません。

編集: 出力用に cassandra で columnFamily を作成していませんでした。したがって、それは書いていませんでした。したがって、残っている唯一の問題は、分散モードで実行する方法です。

ありがとうございました。

4

1 に答える 1

2

クラスでjarを作成しましたか?

Hadoop では、ジョブ クラスをクラスタ全体に伝播できるようにするために jar が必要です。そうでない場合は、「No job jar file set」エラーと、分散モードで実行できない理由が説明されています。「hadoop jar ...」コマンドでジョブを起動し、jar 依存関係 (少なくとも apache-cassandra!) を追加するように注意してください。ジョブを送信するときは、cassandra サーバーが稼働していて、thrift ポートをリッスンしている必要があります。

ちなみに、Hadoop と Cassandra には Hector は必要ありません。(ColumnFamilyInputFormatおよびColumnFamilyOutputFormat) は、独自に Cassandra にデータを読み取る (および書き込む) 方法になりました。そのためRpcPort、 , InitialAdressandを構成する必要がPartionnerあります (そして、あなたはそれを行いました)。

最後の注意: はColumnFamilyOutputFormat出力列ファミリーを作成しません。すでに存在している必要があります。そうでない場合、書き込み時にエラーが発生します。

お役に立てれば、

ブノワ

于 2012-04-18T08:32:36.813 に答える