1

私はスパークするのが初めてで、問題があります。csv ファイルである textFile() で生成された RDD を処理しています。行ごとに、複数の行を新しい RDD (複数ではなく単一の行) に返したいと考えています。これは私のコードです:

JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
            new Function<String, Boolean>() {
                public Boolean call(String line) {
                    return line.contains("LinearAccelerationEvent");
                }
            }).map(
            new Function<String, LinearAccelerationEvent>() {
                public LinearAccelerationEvent call(String line) throws Exception {
                    String[] fields = line.split(",");
                    LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
                    return linearAccelerationEvent;
                }
            }).cache();

ここで行っているのは、最初の csv をフィルター処理して LinearAccelerationEvent のみを取得することです。次に、これらのオブジェクトを LinearAccelerationEvent クラスにマップし、LinearAccelerationEvent オブジェクトの新しい RDD を生成します。最初の csv ファイルの各行に対して、複数の LinearAccelerometerEvent オブジェクトを生成する必要がありますが、その方法がわかりません。私がやりたい理由は、後でこの RDD が次のように cassandra にプッシュされるためです。

javaFunctions(linearAccelerationEventJavaRDD).writerBuilder("d300ea832fe462598f473f76939452283de495a1", "linearaccelerationevent", mapToRow(LinearAccelerationEvent.class)).saveToCassandra();

したがって、理想的なソリューションは次のようになります。

JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
                new Function<String, Boolean>() {
                    public Boolean call(String line) {
                        return line.contains("LinearAccelerationEvent");
                    }
                }).map(
                new Function<String, LinearAccelerationEvent>() {
                    public LinearAccelerationEvent call(String line) throws Exception {
                        String[] fields = line.split(",");
                        for() {
                           LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
                           return linearAccelerationEvent;
                        }
                }
            }).cache();

関数を使用しforeachPartition()て for ループの各イベントを Cassandra にプッシュすることはできますが、このアプローチははるかに遅いことがわかりました。私がやりたいことをするために foreach を使用しないことは可能ですか? ありがとうございました

4

1 に答える 1