次のようなことを行う必要があります (たとえば、典型的な従業員テーブルを想定しています)。
JobConf conf = new JobConf(getConf(), MyDriver.class);
conf.setInputFormat(DBInputFormat.class);
DBConfiguration.configureDB(conf, “com.mysql.jdbc.Driver”, “jdbc:mysql://localhost/mydatabase”); String [] fields = { “employee_id”, "name" };
DBInputFormat.setInput(conf, MyRecord.class, “employees”, null /* conditions */, “employee_id”, fields);
...
// other necessary configuration
JobClient.runJob(conf);
configureDB()
とのsetInput()
呼び出しで を設定しDBInputFormat
ます。最初の呼び出しでは、使用する JDBC ドライバーの実装と、接続するデータベースを指定します。2 番目の呼び出しは、データベースからロードするデータを指定します。MyRecord クラスは Java でデータが読み込まれるクラスで、"employees" は読み込むテーブルの名前です。「employee_id」パラメーターは、結果の順序付けに使用されるテーブルの主キーを指定します。以下のセクション「InputFormat の制限」では、これが必要な理由を説明しています。最後に、fields 配列は、読み取るテーブルの列をリストします。の定義をオーバーロードsetInput()
すると、代わりに任意の SQL クエリを指定して読み取ることができます。
configureDB()
とを呼び出した後setInput()
、残りのジョブを通常どおりに構成し、Mapper クラスと Reducer クラスを設定し、読み取る他のデータ ソース (HDFS のデータセットなど) やその他のジョブ固有のパラメーターを指定する必要があります。
Writable
次のような独自の実装を作成する必要があります (id と name をテーブル フィールドと見なします)。
class MyRecord implements Writable, DBWritable {
long id;
String name;
public void readFields(DataInput in) throws IOException {
this.id = in.readLong();
this.name = Text.readString(in);
}
public void readFields(ResultSet resultSet) throws SQLException {
this.id = resultSet.getLong(1);
this.name = resultSet.getString(2); }
public void write(DataOutput out) throws IOException {
out.writeLong(this.id);
Text.writeString(out, this.name); }
public void write(PreparedStatement stmt) throws SQLException {
stmt.setLong(1, this.id);
stmt.setString(2, this.name); }
}
その後、マッパーは DBWritable 実装のインスタンスを入力値として受け取ります。入力キーは、データベースによって提供される行 ID です。ほとんどの場合、この値を破棄します。
public class MyMapper extends MapReduceBase implements Mapper<LongWritable, MyRecord, LongWritable, Text> {
public void map(LongWritable key, MyRecord val, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException {
// Use val.id, val.name here
output.collect(new LongWritable(val.id), new Text(val.name));
}
}
詳細については、次のリンクを参照してください (私の回答の実際のソース): http://blog.cloudera.com/blog/2009/03/database-access-with-hadoop/