3

現在、Apache Crunch を評価しています。簡単なWordCount MapReduce ジョブの例に従いました。その後、結果をスタンドアロンの HBase に保存しようとしました。ここで説明されているように、HBase が実行されています (jps および HBase シェルでチェック): http://hbase.apache.org/book/quickstart.html

ここで、HBase への書き込みの例を採用します。

Pipeline pipeline = new MRPipeline(WordCount.class,getConf());
PCollection<String> lines = pipeline.readTextFile(inputPath);
PTable<String,Long> counts = noStopWords.count();
pipeline.write(counts, new HBaseTarget("wordCountOutTable");
PipelineResult result = pipeline.done();

「例外:java.lang.illegalArgumentException: HBaseTarget は Put と Delete のみをサポートしています」という例外が発生します。

何が悪かったのか手がかりはありますか?

4

1 に答える 1

3

PTable は PCollection かもしれませんが、HBaseTarget は Put または Delete オブジェクトしか処理できません。したがって、PTable を、コレクションのすべての要素が Put または Delete のいずれかである PCollection に変換する必要があります。これが行われているCrunch-Examplesを見てください。

変換の例は次のようになります。

 public PCollection<Put> createPut(final PTable<String, String> counts) {
   return counts.parallelDo("Convert to puts", new DoFn<Pair<String, String>, Put>() {
     @Override
     public void process(final Pair<String, String> input, final Emitter<Put> emitter) {
       Put put;
       // input.first is used as row key
       put = new Put(Bytes.toBytes(input.first())); 
       // the value (input.second) is added with its family and qualifier
       put.add(COLUMN_FAMILY_TARGET, COLUMN_QUALIFIER_TARGET_TEXT, Bytes.toBytes(input.second())); 
       emitter.emit(put);
     }
   }, Writables.writables(Put.class));
 }
于 2015-01-06T06:07:02.363 に答える