以下の方法で、Eclipse で Cassandra/Hadoop アプリケーションを開発しています。
Maven (m2e) を使用して、Eclipse プロジェクトの依存関係 (Hadoop、Cassandra、Pig など) を収集して構成する
マッパーとリデューサーをテストするためのテスト ケース (src/test/java のクラス) を作成します。秘訣は、RecordWriter と StatusReporter を拡張する内部クラスを使用して、オンザフライでコンテキスト オブジェクトを構築することです。これを行うと、setup/map/cleanup または setup/reduce/cleanup を呼び出した後に、正しいキーと値のペアとコンテキスト情報がマッパーまたはリデューサーによって書き込まれたと断言できます。mapred と mapreduce の両方のコンテキストのコンストラクターは見苦しく見えますが、クラスのインスタンス化は非常に簡単であることがわかります。
これらのテストを作成すると、ビルドするたびに自動的に呼び出されます。
プロジェクトを選択し、Run --> Maven Test を実行することで、テストを手動で呼び出すことができます。テストはデバッグ モードで呼び出され、マッパーとリデューサーにブレークポイントを設定し、Eclipse でデバッグで実行できるすべての優れた機能を実行できるため、これは非常に便利です。
コードの品質に満足したら、Maven を使用して、hadoop が非常に気に入っている 1 つの jar にすべての依存関係を持つ jar を構築します。
余談ですが、私は Eclipse で M2T JET プロジェクトに基づいて多数のコード生成ツールを作成しました。これらは、上で述べたすべてのインフラストラクチャを生成し、マッパー、リデューサー、およびテスト ケースのロジックを記述するだけです。少し考えてみれば、ほとんど同じことを行うために拡張できる再利用可能なクラスのセットを考え出すことができると思います。
サンプルのテスト ケース クラスを次に示します。
/*
*
* This source code and information are provided "AS-IS" without
* warranty of any kind, either expressed or implied, including
* but not limited to the implied warranties of merchantability
* and/or fitness for a particular purpose.
*
* This source code was generated using an evaluation copy
* of the Cassandra/Hadoop Accelerator and may not be used for
* production purposes.
*
*/
package com.creditco.countwords.ReadDocs;
// Begin imports
import java.io.IOException;
import java.util.ArrayList;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.junit.Test;
// End imports
public class ParseDocsMapperTest extends TestCase {
@Test
public void testCount() {
TestRecordWriter recordWriter = new TestRecordWriter();
TestRecordReader recordReader = new TestRecordReader();
TestOutputCommitter outputCommitter = new TestOutputCommitter();
TestStatusReporter statusReporter = new TestStatusReporter();
TestInputSplit inputSplit = new TestInputSplit();
try {
// Begin test logic
// Get an instance of the mapper to be tested and a context instance
ParseDocsMapper mapper = new ParseDocsMapper();
Mapper<LongWritable,Text,Text,IntWritable>.Context context =
mapper.testContext(new Configuration(), new TaskAttemptID(),recordReader,recordWriter,outputCommitter,statusReporter,inputSplit);
// Invoke the setup, map and cleanup methods
mapper.setup(context);
LongWritable key = new LongWritable(30);
Text value = new Text("abc def ghi");
mapper.map(key, value, context);
if (recordWriter.getKeys().length != 3) {
fail("com.creditco.countwords:ParseDocsMapperTest.testCount() - Wrong number of records written ");
}
mapper.cleanup(context);
// Validation:
//
// recordWriter.getKeys() returns the keys written to the context by the mapper
// recordWriter.getValues() returns the values written to the context by the mapper
// statusReporter returns the most recent status and any counters set by the mapper
//
// End test logic
} catch (Exception e) {
fail("com.creditco.countwords:ParseDocsMapperTest.testCount() - Exception thrown: "+e.getMessage());
}
}
final class TestRecordWriter extends RecordWriter<Text, IntWritable> {
ArrayList<Text> keys = new ArrayList<Text>();
ArrayList<IntWritable> values = new ArrayList<IntWritable>();
public void close(TaskAttemptContext arg0) throws IOException, InterruptedException { }
public void write(Text key, IntWritable value) throws IOException, InterruptedException {
keys.add(key);
values.add(value);
}
public Text[] getKeys() {
Text result[] = new Text[keys.size()];
keys.toArray(result);
return result;
}
public IntWritable[] getValues() {
IntWritable[] result = new IntWritable[values.size()];
values.toArray(result);
return result;
}
};
final class TestRecordReader extends RecordReader<LongWritable, Text> {
public void close() throws IOException { }
public LongWritable getCurrentKey() throws IOException, InterruptedException {
throw new RuntimeException("Tried to call RecordReader:getCurrentKey()");
}
public Text getCurrentValue() throws IOException, InterruptedException {
throw new RuntimeException("Tried to call RecordReader:getCurrentValue()");
}
public float getProgress() throws IOException, InterruptedException {
throw new RuntimeException("Tried to call RecordReader:getProgress()");
}
public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { }
public boolean nextKeyValue() throws IOException, InterruptedException {
return false;
}
};
final class TestStatusReporter extends StatusReporter {
private Counters counters = new Counters();
private String status = null;
public void setStatus(String arg0) {
status = arg0;
}
public String getStatus() {
return status;
}
public void progress() { }
public Counter getCounter(String arg0, String arg1) {
return counters.getGroup(arg0).findCounter(arg1);
}
public Counter getCounter(Enum<?> arg0) {
return null;
}
};
final class TestInputSplit extends InputSplit {
public String[] getLocations() throws IOException, InterruptedException {
return null;
}
public long getLength() throws IOException, InterruptedException {
return 0;
}
};
final class TestOutputCommitter extends OutputCommitter {
public void setupTask(TaskAttemptContext arg0) throws IOException { }
public void setupJob(JobContext arg0) throws IOException { }
public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
return false;
}
public void commitTask(TaskAttemptContext arg0) throws IOException { }
public void cleanupJob(JobContext arg0) throws IOException { }
public void abortTask(TaskAttemptContext arg0) throws IOException { }
};
}
ここにサンプルのmaven pomがあります。参照されているバージョンは少し古いことに注意してください。ただし、これらのバージョンがどこかの Maven リポジトリに保持されている限り、このプロジェクトをビルドできます。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.creditco</groupId>
<artifactId>wordcount.example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>1.0.6</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<version>1.0.1.1</version>
<type>jar</type>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>hamcrest-all</artifactId>
<groupId>org.hamcrest</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>0.9.1</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20090211</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
</dependencies>
</project>