ここに考えられる解決策の 1 つがあります。アクセスしたいキーバリューストア(あなたの場合はRocksDB)のクライアントライブラリがあると思います。
KeyValuePair
キー値ストアからの 1 つのキー値ペアを表す Bean クラスを表します。
クラス
/*Lazy iterator to read from KeyValue store*/
class KeyValueIterator implements Iterator<KeyValuePair> {
public KeyValueIterator() {
//TODO initialize your custom reader using java client library
}
@Override
public boolean hasNext() {
//TODO
}
@Override
public KeyValuePair next() {
//TODO
}
}
class KeyValueReader implements FlatMapFunction<KeyValuePair, KeyValuePair>() {
@Override
public Iterator<KeyValuePair> call(KeyValuePair keyValuePair) throws Exception {
//ignore empty 'keyValuePair' object
return new KeyValueIterator();
}
}
KeyValue RDD を作成する
/*list with a dummy KeyValuePair instance*/
ArrayList<KeyValuePair> keyValuePairs = new ArrayList<>();
keyValuePairs.add(new KeyValuePair());
JavaRDD<KeyValuePair> keyValuePairRDD = javaSparkContext.parallelize(keyValuePairs);
/*Read one key-value pair at a time lazily*/
keyValuePairRDD = keyValuePairRDD.flatMap(new KeyValueReader());
ノート:
上記のソリューションは、デフォルトで 2 つのパーティションを持つ RDD を作成します (そのうちの 1 つは空になります)。keyValuePairRDD
エグゼキューター間で処理を分散するために、変換を適用する前にパーティションを増やします。パーティションを増やすさまざまな方法:
keyValuePairRDD.repartition(partitionCounts)
//OR
keyValuePairRDD.partitionBy(...)