私のプロジェクトの一環として、非常に大きな Cassandra データセット用の SQL クエリ インターフェイスを作成する必要があるため、Spark を使用して cassandra 列ファミリーで SQL クエリを実行するためのさまざまな方法を検討しており、3 つの異なる方法を考え出しました。
静的に定義されたスキーマで Spark SQLContext を使用する
// statically defined in the application public static class TableTuple implements Serializable { private int id; private String line; TableTuple (int i, String l) { id = i; line = l; } // getters and setters ... }
定義を次のように使用します。
SparkConf conf = new SparkConf(true) .set("spark.cassandra.connection.host", CASSANDRA_HOST) .setJars(jars); SparkContext sc = new SparkContext(HOST, APP_NAME, conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<CassandraRow> rowrdd = javaFunctions(sc).cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY); JavaRDD<TableTuple> rdd = rowrdd.map(row -> new TableTuple(row.getInt(0), row.getString(1))); DataFrame dataFrame = sqlContext.createDataFrame(rdd, TableTuple.class); dataFrame.registerTempTable("lines"); DataFrame resultsFrame = sqlContext.sql("Select line from lines where id=1"); System.out.println(Arrays.asList(resultsFrame.collect()));
動的に定義されたスキーマで Spark SQLContext を使用する
SparkConf conf = new SparkConf(true) .set("spark.cassandra.connection.host", CASSANDRA_HOST) .setJars(jars); SparkContext sc = new SparkContext(HOST, APP_NAME, conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<CassandraRow> cassandraRdd = javaFunctions(sc).cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY); JavaRDD<Row> rdd = cassandraRdd.map(row -> RowFactory.create(row.getInt(0), row.getString(1))); List<StructField> fields = new ArrayList<>(); fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true)); fields.add(DataTypes.createStructField("line", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); DataFrame dataFrame = sqlContext.createDataFrame(rdd, schema); dataFrame.registerTempTable("lines"); DataFrame resultDataFrame = sqlContext.sql("select line from lines where id = 1"); System.out.println(Arrays.asList(resultDataFrame.collect()));
spark-cassandra-connector から CassandraSQLContext を使用する
SparkConf conf = new SparkConf(true) .set("spark.cassandra.connection.host", CASSANDRA_HOST) .setJars(jars); SparkContext sc = new SparkContext(HOST, APP_NAME, conf); CassandraSQLContext sqlContext = new CassandraSQLContext(sc); DataFrame resultsFrame = sqlContext.sql("Select line from " + CASSANDRA_KEYSPACE + "." + CASSANDRA_COLUMN_FAMILY + " where id = 1"); System.out.println(Arrays.asList(resultsFrame.collect()));
ある方法と別の方法の長所/短所を知りたいです。また、CassandraSQLContext
メソッドについては、クエリが CQL に限定されているか、Spark SQL と完全に互換性があります。また、私の特定のユース ケースに関する分析もお願いします。62 列の約 1,760 万のタプルを持つ cassandra 列ファミリーがあります。このような大規模なデータベースを照会するには、どの方法が最も適切ですか?