1

Apache Spark の初心者であり、Spark で Cassandra データを取得する際に問題に直面しています。

List<String> dates = Arrays.asList("2015-01-21","2015-01-22");
CassandraJavaRDD<A> aRDD = CassandraJavaUtil.javaFunctions(sc).
                    cassandraTable("testing", "cf_text",CassandraJavaUtil.mapRowTo(A.class, colMap)).
                    where("Id=? and date IN ?","Open",dates);

このクエリは、cassandra サーバー上のデータをフィルタリングしていません。この Java ステートメントが実行されている間、メモリが消費され、最終的に spark java.lang.OutOfMemoryError 例外がスローされます。https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.mdに記載されているように、クエリはクライアント側ではなく cassandra サーバーでデータを除外する必要があります。

cassandra cqlsh でフィルターを使用してクエリを実行している間、正常に実行されますが、フィルター (where 句) を使用せずにクエリを実行すると、予期されるタイムアウトが発生します。したがって、spark がクライアント側でフィルターを適用していないことは明らかです。

SparkConf conf = new SparkConf();
            conf.setAppName("Test");
            conf.setMaster("local[8]");
            conf.set("spark.cassandra.connection.host", "192.168.1.15")

クライアント側でフィルターが適用される理由と、サーバー側でフィルターを適用するためにどのように改善できるか。

Windows プラットフォームで Cassandra クラスターの上に Spark クラスターを構成するにはどうすればよいでしょうか??

4

3 に答える 3

2

あなたが提供したセクションを読むことから、SparkでCassandraを使用していない(ありがとう)私はそれを見る:

注: 生成された CQL クエリに ALLOW FILTERING 句が暗黙的に追加されますが、現在、すべての述語が Cassandra エンジンで許可されているわけではありません。この制限は、将来の Cassandra リリースで対処される予定です。現在、ALLOW FILTERING は、セカンダリ インデックスまたはクラスタリング列によってインデックス付けされた列で適切に機能します。

「IN」述語がサポートされていないことは確かです(ただし、テストしていません)。 /src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java#L80

そのため、where 節を Id に制限して (そのためのセカンダリ インデックスがあると仮定して)、日付範囲にスパーク フィルタリングを使用することができます。

于 2015-06-30T17:57:04.093 に答える
1

テーブルを RDD ではなく DataFrame として読み取ることをお勧めします。これらは Spark 1.3 以降で利用できます。次に、CQL クエリを次のような文字列として指定できます。

CassandraSQLContext sqlContext = new CassandraSQLContext(sc);

String query = "SELECT * FROM testing.cf_text where id='Open' and date IN ('2015-01-21','2015-01-22')";
DataFrame resultsFrame = sqlContext.sql(query);

System.out.println(resultsFrame.count());

それで、それを試して、それがあなたにとってよりうまくいくかどうか見てください.

DataFrame にデータを取得したら、そのデータに対して Spark SQL 操作を実行できます。RDD のデータが必要な場合は、DataFrame を RDD に変換できます。

于 2015-07-01T00:03:01.553 に答える
1

SparkConfing で spark.cassandra.input.split.size_in_mb を設定すると、問題が解決しました。

conf = new SparkConf();
        conf.setAppName("Test");
        conf.setMaster("local[4]");
        conf.set("spark.cassandra.connection.host", "192.168.1.15").
        set("spark.executor.memory", "2g").
        set("spark.cassandra.input.split.size_in_mb", "67108864");

Spark-cassnadra-connector は spark.cassandra.input.split.size_in_mb の間違った値を読み取るため、SparkConf でこの値をオーバーライドすると機能します。現在、IN句もうまく機能しています。

于 2015-07-01T06:39:14.347 に答える