次のシナリオを想像してみてください。Spark アプリケーション (Java 実装) は、Cassandra データベースを使用してロード、RDD への変換、およびデータの処理を行っています。また、アプリケーションは、カスタム レシーバーによって処理されるデータベースからの新しいデータをストリーミングしています。ストリーミング プロセスの出力は、データベースに格納されます。実装は、データベースとの統合から Spring Data Cassandra を使用しています。
CassandraConfig:
@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {
@Autowired
private Environment env;
@Bean
public CassandraClusterFactoryBean cluster() {
CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));
return cluster;
}
@Bean
public CassandraMappingContext mappingContext() {
return new BasicCassandraMappingContext();
}
@Bean
public CassandraConverter converter() {
return new MappingCassandraConverter(mappingContext());
}
@Bean
public CassandraSessionFactoryBean session() throws Exception {
CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
session.setCluster(cluster().getObject());
session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
session.setConverter(converter());
session.setSchemaAction(SchemaAction.NONE);
return session;
}
@Bean
public CassandraOperations cassandraTemplate() throws Exception {
return new CassandraTemplate(session().getObject());
}
}
DataProcessor.main メソッド:
// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);
while(pagingResults != null && !pagingResults.isEmpty()) {
Event lastEvent = pagingResults.get(pagingResults.size() - 1);
pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize page and add to the existing
rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}
// data processing
...
初期ロードには大量のデータが含まれることが予想されます。このため、データはページ分割され、ロードされ、rddBuffer に配布されます。
次のオプションも利用できます。
- Spark-Cassandra の例 ( https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala )。最小限のドキュメントしかありません。この例では。
- Calliope プロジェクト ( http://tuplejump.github.io/calliope/ )
Spark と Cassandra を統合するためのベスト プラクティスを教えてください。私の実装で従うべき最良のオプションは何ですか?
アパッチ スパーク 1.0.0、アパッチ カサンドラ 2.0.8