0

Hadoop プロセスで、MySQL を入力として設定しようとしています。バージョン 1.0.3 で Hadoop - MySQL 接続に DBInputFormat クラスを使用するには? hadoop-1.0.3/docs/api/ からの JobConf によるジョブの構成は機能しません。

// Create a new JobConf
JobConf job = new JobConf(new Configuration(), MyJob.class);

// Specify various job-specific parameters     
job.setJobName("myjob");

FileInputFormat.setInputPaths(job, new Path("in"));
FileOutputFormat.setOutputPath(job, new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setCombinerClass(MyJob.MyReducer.class);
job.setReducerClass(MyJob.MyReducer.class);

job.setInputFormat(SequenceFileInputFormat.class);
job.setOutputFormat(SequenceFileOutputFormat.class);
4

2 に答える 2

0

次のようなことを行う必要があります (たとえば、典型的な従業員テーブルを想定しています)。

JobConf conf = new JobConf(getConf(), MyDriver.class);
    conf.setInputFormat(DBInputFormat.class); 
    DBConfiguration.configureDB(conf, “com.mysql.jdbc.Driver”, “jdbc:mysql://localhost/mydatabase”); String [] fields = { “employee_id”, "name" };
    DBInputFormat.setInput(conf, MyRecord.class, “employees”, null /* conditions */, “employee_id”, fields); 
    ...
    // other necessary configuration
    JobClient.runJob(conf); 

configureDB()とのsetInput()呼び出しで を設定しDBInputFormatます。最初の呼び出しでは、使用する JDBC ドライバーの実装と、接続するデータベースを指定します。2 番目の呼び出しは、データベースからロードするデータを指定します。MyRecord クラスは Java でデータが読み込まれるクラスで、"employees" は読み込むテーブルの名前です。「employee_id」パラメーターは、結果の順序付けに使用されるテーブルの主キーを指定します。以下のセクション「InputFormat の制限」では、これが必要な理由を説明しています。最後に、fields 配列は、読み取るテーブルの列をリストします。の定義をオーバーロードsetInput()すると、代わりに任意の SQL クエリを指定して読み取ることができます。

configureDB()とを呼び出した後setInput()、残りのジョブを通常どおりに構成し、Mapper クラスと Reducer クラスを設定し、読み取る他のデータ ソース (HDFS のデータセットなど) やその他のジョブ固有のパラメーターを指定する必要があります。

Writable次のような独自の実装を作成する必要があります (id と name をテーブル フィールドと見なします)。

class MyRecord implements Writable, DBWritable { 
    long id; 
    String name; 

    public void readFields(DataInput in) throws IOException { 
        this.id = in.readLong(); 
        this.name = Text.readString(in); 
        } 

    public void readFields(ResultSet resultSet) throws SQLException { 
        this.id = resultSet.getLong(1); 
        this.name = resultSet.getString(2); } 

    public void write(DataOutput out) throws IOException { 
        out.writeLong(this.id); 
        Text.writeString(out, this.name); } 

    public void write(PreparedStatement stmt) throws SQLException { 
        stmt.setLong(1, this.id); 
        stmt.setString(2, this.name); } 
    } 

その後、マッパーは DBWritable 実装のインスタンスを入力値として受け取ります。入力キーは、データベースによって提供される行 ID です。ほとんどの場合、この値を破棄します。

public class MyMapper extends MapReduceBase implements Mapper<LongWritable, MyRecord, LongWritable, Text> { 
public void map(LongWritable key, MyRecord val, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException { 
// Use val.id, val.name here 
output.collect(new LongWritable(val.id), new Text(val.name)); 
} 
} 

詳細については、次のリンクを参照してください (私の回答の実際のソース): http://blog.cloudera.com/blog/2009/03/database-access-with-hadoop/

于 2012-12-13T19:42:31.867 に答える
0

この投稿を見てください。Map Reduce から MySQL データベースにデータをシンクする方法を示します。

于 2012-12-17T06:03:33.643 に答える