8

次のシナリオを想像してみてください。Spark アプリケーション (Java 実装) は、Cassandra データベースを使用してロード、RDD への変換、およびデータの処理を行っています。また、アプリケーションは、カスタム レシーバーによって処理されるデータベースからの新しいデータをストリーミングしています。ストリーミング プロセスの出力は、データベースに格納されます。実装は、データベースとの統合から Spring Data Cassandra を使用しています。

CassandraConfig:

@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {

    @Autowired
    private Environment env;

    @Bean
    public CassandraClusterFactoryBean cluster() {
        CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
        cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
        cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));

        return cluster;
    }

    @Bean
    public CassandraMappingContext mappingContext() {
        return new BasicCassandraMappingContext();
    }

    @Bean
    public CassandraConverter converter() {
        return new MappingCassandraConverter(mappingContext());
    }

    @Bean
    public CassandraSessionFactoryBean session() throws Exception {
        CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
        session.setCluster(cluster().getObject());
        session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
        session.setConverter(converter());
        session.setSchemaAction(SchemaAction.NONE);

        return session;
    }

    @Bean
    public CassandraOperations cassandraTemplate() throws Exception {
        return new CassandraTemplate(session().getObject());
    }

}

DataProcessor.main メソッド:

// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);

// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);

while(pagingResults != null && !pagingResults.isEmpty()) {
    Event lastEvent = pagingResults.get(pagingResults.size() - 1);
    pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
    // Parallelize page and add to the existing
    rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}

// data processing
...

初期ロードには大量のデータが含まれることが予想されます。このため、データはページ分割され、ロードされ、rddBuffer に配布されます。

次のオプションも利用できます。

  1. Spark-Cassandra の例 ( https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala )。最小限のドキュメントしかありません。この例では。
  2. Calliope プロジェクト ( http://tuplejump.github.io/calliope/ )

Spark と Cassandra を統合するためのベスト プラクティスを教えてください。私の実装で従うべき最良のオプションは何ですか?

アパッチ スパーク 1.0.0、アパッチ カサンドラ 2.0.8

4

2 に答える 2

9

Cassandra と Spark を操作する最も簡単な方法は、DataStax によって開発された Spark 用の公式オープン ソース Cassandra ドライバーを使用することです: https://github.com/datastax/spark-cassandra-connector

このドライバーは、Cassandra Java Driver の上に構築されており、Cassandra と Spark の間の直接的なブリッジを提供します。Calliope とは異なり、Hadoop インターフェイスを使用しません。さらに、次の独自の機能を提供します。

  • コレクションを含むすべての Cassandra データ型をすぐにサポート
  • Scala の暗黙的機能やその他の高度な機能を使用する必要のない、Cassandra 行のカスタム クラスまたはタプルへの軽量マッピング
  • RDD を Cassandra に保存する
  • Cassandra 仮想ノードの完全サポート
  • Cassandra クラスタリング列またはセカンダリ インデックスを利用するなど、サーバー側でフィルタリング/選択する機能
于 2014-06-30T19:36:23.953 に答える
1

上記のコードのアプローチは、1 つのノードで実行された場合にのみ機能する従来の集中型アルゴリズムです。Cassandra と Spark はどちらも分散システムであるため、多数のノード間で分散できるようにプロセスをモデル化する必要があります。

可能なアプローチはいくつかあります: フェッチする行のキーがわかっている場合は、次のような単純なことを行うことができます: (DataStax Java Driver を使用)

val data = sparkContext.parallelize(keys).map{key => 
   val cluster = val cluster = Cluster.builder.addContactPoint(host).build()
   val session  = cluster.connect(keyspace)
   val statement = session.prepare("...cql...);")
   val boundStatement = new BoundStatement(sttmt)
   session.execute(session.execute(boundStatement.bind(...data...)
}

これにより、キーのフェッチが Spark クラスター全体に効果的に分散されます。クロージャー内で C* への接続がどのように行われるかに注意してください。これにより、分散された各ワーカーでタスクが実行されるときに接続が確立されることが保証されます。

例でワイルドカードを使用している場合 (つまり、キーが不明な場合)、Cassandra の Hadoop インターフェイスを使用するのが適切なオプションです。質問にリンクされている Spark-Cassandra の例は、Cassandra でのこの Hadoop インターフェイスの使用を示しています。

Calliope は、その機能にアクセスするための単純な API を提供することにより、Hadoop インターフェイスの複雑な使用をカプセル化するライブラリです。特定の Scala 機能 (今後のリリースでの Implicit やマクロなど) を使用するため、Scala でのみ使用可能ジョブへの Hadoop インターフェイス。Calliope (および基盤となる Hadoop インターフェイス) は、ドライバーを使用して Cassandra と対話するよりも 2 ~ 4 倍高速であることがわかりました。

結論: Spring-Data 構成から離れて Cassandra にアクセスします。これにより、単一ノードに制限されるためです。可能であれば単純な並列アクセスを検討するか、Scala で Calliope を使用して検討してください。

于 2014-06-28T22:49:51.103 に答える