私はスパークするのが初めてで、問題があります。csv ファイルである textFile() で生成された RDD を処理しています。行ごとに、複数の行を新しい RDD (複数ではなく単一の行) に返したいと考えています。これは私のコードです:
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}).cache();
ここで行っているのは、最初の csv をフィルター処理して LinearAccelerationEvent のみを取得することです。次に、これらのオブジェクトを LinearAccelerationEvent クラスにマップし、LinearAccelerationEvent オブジェクトの新しい RDD を生成します。最初の csv ファイルの各行に対して、複数の LinearAccelerometerEvent オブジェクトを生成する必要がありますが、その方法がわかりません。私がやりたい理由は、後でこの RDD が次のように cassandra にプッシュされるためです。
javaFunctions(linearAccelerationEventJavaRDD).writerBuilder("d300ea832fe462598f473f76939452283de495a1", "linearaccelerationevent", mapToRow(LinearAccelerationEvent.class)).saveToCassandra();
したがって、理想的なソリューションは次のようになります。
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
for() {
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}
}).cache();
関数を使用しforeachPartition()
て for ループの各イベントを Cassandra にプッシュすることはできますが、このアプローチははるかに遅いことがわかりました。私がやりたいことをするために foreach を使用しないことは可能ですか? ありがとうございました