28

RDD の最初の要素を取得するメソッド rdd.firstwfirst() を知っています。

また、最初の「num」要素を提供するメソッド rdd.take(num) もあります。

しかし、インデックスで要素を取得する可能性はありませんか?

ありがとう

4

3 に答える 3

59

これは、最初に RDD のインデックスを作成することで可能になります。この変換zipWithIndexにより、各要素に元の順序で番号が付けられ、安定したインデックスが提供されます。

与えられた:rdd = (a,b,c)

val withIndex = rdd.zipWithIndex // ((a,0),(b,1),(c,2))

要素をインデックスで検索する場合、この形式は役に立ちません。まず、インデックスをキーとして使用する必要があります。

val indexKey = withIndex.map{case (k,v) => (v,k)}  //((0,a),(1,b),(2,c))

lookupこれで、 PairRDDのアクションを使用して、キーで要素を見つけることができます。

val b = indexKey.lookup(1) // Array(b)

同じ RDD で頻繁に使用することが予想される場合は、パフォーマンスを向上させるために RDD をlookupキャッシュすることをお勧めします。indexKey

Java APIを使用してこれを行う方法は、読者の課題として残されています。

于 2014-11-09T16:46:51.207 に答える
2

私もしばらくこれに行き詰まったので、Maasgの答えを拡張しますが、Javaのインデックスで値の範囲を探すために答えます(上部に4つの変数を定義する必要があります):

DataFrame df;
SQLContext sqlContext;
Long start;
Long end;

JavaPairRDD<Row, Long> indexedRDD = df.toJavaRDD().zipWithIndex();
JavaRDD filteredRDD = indexedRDD.filter((Tuple2<Row,Long> v1) -> v1._2 >= start && v1._2 < end);
DataFrame filteredDataFrame = sqlContext.createDataFrame(filteredRDD, df.schema());

このコードを実行するときは、クラスターに Java 8 が必要になることに注意してください (ラムダ式が使用されているため)。

また、zipWithIndex はおそらく高価です。

于 2016-08-02T11:01:17.767 に答える
2

このクラスを試して、インデックスでアイテムをフェッチしました。まず、 を構築するnew IndexedFetcher(rdd, itemClass)と、RDD の各パーティションの要素数がカウントされます。次に、 を呼び出すとindexedFetcher.get(n)、そのインデックスを含むパーティションのみでジョブが実行されます。

1.8 ではなく Java 1.7 を使用してこれをコンパイルする必要があることに注意してください。Spark 1.1.0 の時点で、com.esotericsoftware.reflectasm 内にバンドルされている org.objectweb.asm は、まだ Java 1.8 クラスを読み取ることができません (Java 1.8 関数を runJob しようとすると、IllegalStateException がスローされます)。

import java.io.Serializable;

import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;

import scala.reflect.ClassTag;

public static class IndexedFetcher<E> implements Serializable {
    private static final long serialVersionUID = 1L;
    public final RDD<E> rdd;
    public Integer[] elementsPerPartitions;
    private Class<?> clazz;
    public IndexedFetcher(RDD<E> rdd, Class<?> clazz){
        this.rdd = rdd;
        this.clazz = clazz;
        SparkContext context = this.rdd.context();
        ClassTag<Integer> intClassTag = scala.reflect.ClassTag$.MODULE$.<Integer>apply(Integer.class);
        elementsPerPartitions = (Integer[]) context.<E, Integer>runJob(rdd, IndexedFetcher.<E>countFunction(), intClassTag);
    }
    public static class IteratorCountFunction<E> extends scala.runtime.AbstractFunction2<TaskContext, scala.collection.Iterator<E>, Integer> implements Serializable {
        private static final long serialVersionUID = 1L;
        @Override public Integer apply(TaskContext taskContext, scala.collection.Iterator<E> iterator) {
            int count = 0;
            while (iterator.hasNext()) {
                count++;
                iterator.next();
            }
            return count;
        }
    }
    static <E> scala.Function2<TaskContext, scala.collection.Iterator<E>, Integer> countFunction() {
        scala.Function2<TaskContext, scala.collection.Iterator<E>, Integer> function = new IteratorCountFunction<E>();
        return function;
    }
    public E get(long index) {
        long remaining = index;
        long totalCount = 0;
        for (int partition = 0; partition < elementsPerPartitions.length; partition++) {
            if (remaining < elementsPerPartitions[partition]) {
                return getWithinPartition(partition, remaining);
            }
            remaining -= elementsPerPartitions[partition];
            totalCount += elementsPerPartitions[partition];
        }
        throw new IllegalArgumentException(String.format("Get %d within RDD that has only %d elements", index, totalCount));
    }
    public static class FetchWithinPartitionFunction<E> extends scala.runtime.AbstractFunction2<TaskContext, scala.collection.Iterator<E>, E> implements Serializable {
        private static final long serialVersionUID = 1L;
        private final long indexWithinPartition;
        public FetchWithinPartitionFunction(long indexWithinPartition) {
            this.indexWithinPartition = indexWithinPartition;
        }
        @Override public E apply(TaskContext taskContext, scala.collection.Iterator<E> iterator) {
            int count = 0;
            while (iterator.hasNext()) {
                E element = iterator.next();
                if (count == indexWithinPartition)
                    return element;
                count++;
            }
            throw new IllegalArgumentException(String.format("Fetch %d within partition that has only %d elements", indexWithinPartition, count));
        }
    }
    public E getWithinPartition(int partition, long indexWithinPartition) {
        System.out.format("getWithinPartition(%d, %d)%n", partition, indexWithinPartition);
        SparkContext context = rdd.context();
        scala.Function2<TaskContext, scala.collection.Iterator<E>, E> function = new FetchWithinPartitionFunction<E>(indexWithinPartition);
        scala.collection.Seq<Object> partitions = new scala.collection.mutable.WrappedArray.ofInt(new int[] {partition});
        ClassTag<E> classTag = scala.reflect.ClassTag$.MODULE$.<E>apply(this.clazz);
        E[] result = (E[]) context.<E, E>runJob(rdd, function, partitions, true, classTag);
        return result[0];
    }
}
于 2014-11-17T20:58:01.987 に答える