Cassandra DB からキースペースとテーブルを作成/削除するには、Cassandra セッションが必要です。Spark アプリケーションで Cassandra セッションを作成するには、SparkConf を CassandraConnector に渡す必要があります。Spark 2.0 では、以下のように実行できます。
SparkSession spark = SparkSession
.builder()
.appName("SparkCassandraApp")
.config("spark.cassandra.connection.host", "localhost")
.config("spark.cassandra.connection.port", "9042")
.master("local[2]")
.getOrCreate();
CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());
Session session = connector.openSession();
session.execute("CREATE TABLE mykeyspace.mytable(id UUID PRIMARY KEY, username TEXT, email TEXT)");
既存のデータフレームがある場合はDataFrameFunctions.createCassandraTable(Df)
、同様に Cassandra でテーブルを作成できます。API の詳細については、こちらを参照してください。
以下のように、spark-cassandra-connector が提供する api を使用して、Cassandra DB からデータを読み取ることができます。
Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "mykeyspace");
put("table", "mytable");
}
}).load();
dataset.show();
以下のように、SparkSession.sql() メソッドを使用して、spark cassandra コネクタによって返された Dataframe で作成された一時テーブルに対してクエリを実行できます。
dataset.createOrReplaceTempView("usertable");
Dataset<Row> dataset1 = spark.sql("select * from usertable where username = 'Mat'");
dataset1.show();