10

Spark がストレージとやり取りする方法を理解できません。

RocksDB データベース (またはその他のキーと値のストア) からデータをフェッチする Spark クラスターを作成したいと考えています。ただし、現時点で私ができる最善の方法は、データベースからデータセット全体を各クラスター ノードのメモリ (たとえば、マップ) にフェッチし、そのオブジェクトから RDD を構築することです。

必要なデータのみをフェッチするにはどうすればよいですか (Spark が HDFS で行うように)? Hadoop Input Format と Record Readers について読んだことがありますが、何を実装すべきかを完全には理解していません。

これは幅広い質問であることは承知していますが、私を始めるための助けをいただければ幸いです。前もって感謝します。

4

1 に答える 1

7

ここに考えられる解決策の 1 つがあります。アクセスしたいキーバリューストア(あなたの場合はRocksDB)のクライアントライブラリがあると思います。
KeyValuePairキー値ストアからの 1 つのキー値ペアを表す Bean クラスを表します。

クラス

/*Lazy iterator to read from KeyValue store*/
class KeyValueIterator implements Iterator<KeyValuePair> {
    public KeyValueIterator() {
        //TODO initialize your custom reader using java client library
    }
    @Override
    public boolean hasNext() {
        //TODO
    }

    @Override
    public KeyValuePair next() {
        //TODO
    }
}
class KeyValueReader implements FlatMapFunction<KeyValuePair, KeyValuePair>() {
    @Override
    public Iterator<KeyValuePair> call(KeyValuePair keyValuePair) throws Exception {
        //ignore empty 'keyValuePair' object
        return new KeyValueIterator();
    }
}

KeyValue RDD を作成する

/*list with a dummy KeyValuePair instance*/
ArrayList<KeyValuePair> keyValuePairs = new ArrayList<>();
keyValuePairs.add(new KeyValuePair());
JavaRDD<KeyValuePair> keyValuePairRDD = javaSparkContext.parallelize(keyValuePairs);
/*Read one key-value pair at a time lazily*/    
keyValuePairRDD = keyValuePairRDD.flatMap(new KeyValueReader());

ノート:

上記のソリューションは、デフォルトで 2 つのパーティションを持つ RDD を作成します (そのうちの 1 つは空になります)。keyValuePairRDDエグゼキューター間で処理を分散するために、変換を適用する前にパーティションを増やします。パーティションを増やすさまざまな方法:

keyValuePairRDD.repartition(partitionCounts)
//OR
keyValuePairRDD.partitionBy(...)
于 2016-12-14T09:59:06.090 に答える