RDD の最初の要素を取得するメソッド rdd.firstwfirst() を知っています。
また、最初の「num」要素を提供するメソッド rdd.take(num) もあります。
しかし、インデックスで要素を取得する可能性はありませんか?
ありがとう
RDD の最初の要素を取得するメソッド rdd.firstwfirst() を知っています。
また、最初の「num」要素を提供するメソッド rdd.take(num) もあります。
しかし、インデックスで要素を取得する可能性はありませんか?
ありがとう
これは、最初に RDD のインデックスを作成することで可能になります。この変換zipWithIndex
により、各要素に元の順序で番号が付けられ、安定したインデックスが提供されます。
与えられた:rdd = (a,b,c)
val withIndex = rdd.zipWithIndex // ((a,0),(b,1),(c,2))
要素をインデックスで検索する場合、この形式は役に立ちません。まず、インデックスをキーとして使用する必要があります。
val indexKey = withIndex.map{case (k,v) => (v,k)} //((0,a),(1,b),(2,c))
lookup
これで、 PairRDDのアクションを使用して、キーで要素を見つけることができます。
val b = indexKey.lookup(1) // Array(b)
同じ RDD で頻繁に使用することが予想される場合は、パフォーマンスを向上させるために RDD をlookup
キャッシュすることをお勧めします。indexKey
Java APIを使用してこれを行う方法は、読者の課題として残されています。
私もしばらくこれに行き詰まったので、Maasgの答えを拡張しますが、Javaのインデックスで値の範囲を探すために答えます(上部に4つの変数を定義する必要があります):
DataFrame df;
SQLContext sqlContext;
Long start;
Long end;
JavaPairRDD<Row, Long> indexedRDD = df.toJavaRDD().zipWithIndex();
JavaRDD filteredRDD = indexedRDD.filter((Tuple2<Row,Long> v1) -> v1._2 >= start && v1._2 < end);
DataFrame filteredDataFrame = sqlContext.createDataFrame(filteredRDD, df.schema());
このコードを実行するときは、クラスターに Java 8 が必要になることに注意してください (ラムダ式が使用されているため)。
また、zipWithIndex はおそらく高価です。
このクラスを試して、インデックスでアイテムをフェッチしました。まず、 を構築するnew IndexedFetcher(rdd, itemClass)
と、RDD の各パーティションの要素数がカウントされます。次に、 を呼び出すとindexedFetcher.get(n)
、そのインデックスを含むパーティションのみでジョブが実行されます。
1.8 ではなく Java 1.7 を使用してこれをコンパイルする必要があることに注意してください。Spark 1.1.0 の時点で、com.esotericsoftware.reflectasm 内にバンドルされている org.objectweb.asm は、まだ Java 1.8 クラスを読み取ることができません (Java 1.8 関数を runJob しようとすると、IllegalStateException がスローされます)。
import java.io.Serializable;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
import scala.reflect.ClassTag;
public static class IndexedFetcher<E> implements Serializable {
private static final long serialVersionUID = 1L;
public final RDD<E> rdd;
public Integer[] elementsPerPartitions;
private Class<?> clazz;
public IndexedFetcher(RDD<E> rdd, Class<?> clazz){
this.rdd = rdd;
this.clazz = clazz;
SparkContext context = this.rdd.context();
ClassTag<Integer> intClassTag = scala.reflect.ClassTag$.MODULE$.<Integer>apply(Integer.class);
elementsPerPartitions = (Integer[]) context.<E, Integer>runJob(rdd, IndexedFetcher.<E>countFunction(), intClassTag);
}
public static class IteratorCountFunction<E> extends scala.runtime.AbstractFunction2<TaskContext, scala.collection.Iterator<E>, Integer> implements Serializable {
private static final long serialVersionUID = 1L;
@Override public Integer apply(TaskContext taskContext, scala.collection.Iterator<E> iterator) {
int count = 0;
while (iterator.hasNext()) {
count++;
iterator.next();
}
return count;
}
}
static <E> scala.Function2<TaskContext, scala.collection.Iterator<E>, Integer> countFunction() {
scala.Function2<TaskContext, scala.collection.Iterator<E>, Integer> function = new IteratorCountFunction<E>();
return function;
}
public E get(long index) {
long remaining = index;
long totalCount = 0;
for (int partition = 0; partition < elementsPerPartitions.length; partition++) {
if (remaining < elementsPerPartitions[partition]) {
return getWithinPartition(partition, remaining);
}
remaining -= elementsPerPartitions[partition];
totalCount += elementsPerPartitions[partition];
}
throw new IllegalArgumentException(String.format("Get %d within RDD that has only %d elements", index, totalCount));
}
public static class FetchWithinPartitionFunction<E> extends scala.runtime.AbstractFunction2<TaskContext, scala.collection.Iterator<E>, E> implements Serializable {
private static final long serialVersionUID = 1L;
private final long indexWithinPartition;
public FetchWithinPartitionFunction(long indexWithinPartition) {
this.indexWithinPartition = indexWithinPartition;
}
@Override public E apply(TaskContext taskContext, scala.collection.Iterator<E> iterator) {
int count = 0;
while (iterator.hasNext()) {
E element = iterator.next();
if (count == indexWithinPartition)
return element;
count++;
}
throw new IllegalArgumentException(String.format("Fetch %d within partition that has only %d elements", indexWithinPartition, count));
}
}
public E getWithinPartition(int partition, long indexWithinPartition) {
System.out.format("getWithinPartition(%d, %d)%n", partition, indexWithinPartition);
SparkContext context = rdd.context();
scala.Function2<TaskContext, scala.collection.Iterator<E>, E> function = new FetchWithinPartitionFunction<E>(indexWithinPartition);
scala.collection.Seq<Object> partitions = new scala.collection.mutable.WrappedArray.ofInt(new int[] {partition});
ClassTag<E> classTag = scala.reflect.ClassTag$.MODULE$.<E>apply(this.clazz);
E[] result = (E[]) context.<E, E>runJob(rdd, function, partitions, true, classTag);
return result[0];
}
}