2

JavaRDDをmysqlデータベースに保存する方法がapache sparkにあるかどうか誰か教えてください。2 つの csv ファイルから入力を取得し、その内容に対して結合操作を行った後、出力 (出力 JavaRDD) を mysql データベースに保存する必要があります。出力を hdfs に正常に保存できましたが、Apache Spark-MYSQL 接続に関連する情報が見つかりません。以下に、spark sql のコードを掲載しています。これは、spark-sql の例を探している人の参考になるかもしれません。

package attempt1;

import java.io.Serializable;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;


public class Spark_Mysql {
    @SuppressWarnings("serial")
    public static class CompleteSample implements Serializable {
        private String ASSETNUM;
        private String ASSETTAG;
        private String CALNUM;



        public String getASSETNUM() {
            return ASSETNUM;
        }
        public void setASSETNUM(String aSSETNUM) {
            ASSETNUM = aSSETNUM;
        }
        public String getASSETTAG() {
            return ASSETTAG;
        }
        public void setASSETTAG(String aSSETTAG) {
            ASSETTAG = aSSETTAG;
        }
        public String getCALNUM() {
            return CALNUM;
        }
        public void setCALNUM(String cALNUM) {
            CALNUM = cALNUM;
        }


      }

    @SuppressWarnings("serial")
    public static class ExtendedSample implements Serializable {

        private String ASSETNUM;
        private String CHANGEBY;
        private String CHANGEDATE;


        public String getASSETNUM() {
            return ASSETNUM;
        }
        public void setASSETNUM(String aSSETNUM) {
            ASSETNUM = aSSETNUM;
        }
        public String getCHANGEBY() {
            return CHANGEBY;
        }
        public void setCHANGEBY(String cHANGEBY) {
            CHANGEBY = cHANGEBY;
        }
        public String getCHANGEDATE() {
            return CHANGEDATE;
        }
        public void setCHANGEDATE(String cHANGEDATE) {
            CHANGEDATE = cHANGEDATE;
        }
    }

    @SuppressWarnings("serial")
    public static void main(String[] args) throws Exception {

          JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL");
          JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

          JavaRDD<CompleteSample> cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(
                  new Function<String, CompleteSample>() {
                    public CompleteSample call(String line) throws Exception {
                      String[] parts = line.split(",");

                      CompleteSample cs = new CompleteSample();
                      cs.setASSETNUM(parts[0]);
                      cs.setASSETTAG(parts[1]);
                      cs.setCALNUM(parts[2]);

                      return cs;
                    }
                  });

          JavaRDD<ExtendedSample> es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(
                  new Function<String, ExtendedSample>() {
                    public ExtendedSample call(String line) throws Exception {
                      String[] parts = line.split(",");

                      ExtendedSample es = new ExtendedSample();
                      es.setASSETNUM(parts[0]);
                      es.setCHANGEBY(parts[1]);
                      es.setCHANGEDATE(parts[2]);

                      return es;
                    }
                  });

          JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class);
            complete.registerAsTable("cs");

          JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class);
          extended.registerAsTable("es");

          JavaSchemaRDD fs= sqlCtx.sql("SELECT cs.ASSETTAG, cs.CALNUM, es.CHANGEBY, es.CHANGEDATE FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;");


          JavaRDD<String> result = fs.map(new Function<Row, String>() {
              public String call(Row row) {
                return row.getString(0);
              }
            });

              result.saveAsTextFile("hdfs://path/to/hdfs/dir-name");          //instead of hdfs I need to save it on mysql database, but I am not able to find any Spark-MYSQL connection

    }



}

最後に、結果を HDFS に正常に保存しています。しかし、今は MYSQL データベースに保存したいと思っています。親切に私を助けてください。ありがとう

4

2 に答える 2

4

結果をデータベースに書き戻す方法は 2 つあります。1 つは、DBOutputFormat のようなものを使用して構成することです。もう 1 つは、保存する RDD で foreachPartition を使用し、MySQL への接続を作成して結果を書き戻す関数を渡すことです。

于 2014-07-23T18:32:41.127 に答える
0

DBOutputFormat を使用した例を次に示します。

テーブルの行を表すクラスを作成します -

public class TableRow implements DBWritable
{
    public String column1;
    public String column2;

    @Override
    public void write(PreparedStatement statement) throws SQLException
    {
        statement.setString(1, column1);
        statement.setString(2, column2);
    }

    @Override
    public void readFields(ResultSet resultSet) throws SQLException
    {
        throw new RuntimeException("readFields not implemented");
    }
}

次に、ジョブを構成し、mapToPair 関数を記述します。この値は使用されていないようです。知ってる人いたらコメントお願いします。

String tableName = "YourTableName";
String[] fields = new String[] { "column1", "column2" };

JobConf job = new JobConf();
DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost/DatabaseNameHere", "username", "password");
DBOutputFormat.setOutput(job, tableName, fields);

// map your rdd into a table row
JavaPairRDD<TableRow, Object> rows = rdd.mapToPair(...);

rows.saveAsHadoopDataset(job);
于 2015-01-20T02:19:43.927 に答える