0

現在、elasticsearch hadoop はデータセット/rdd を 1 対 1 のマッピングでドキュメントに変換しています。つまり、データセットの 1 行が 1 つのドキュメントに変換されます。私たちのシナリオでは、このようなことをしています

ユニのために

PUT spark/docs/1
{
"_k":"one",
"_k":"two",
"_k":"three" // large sets , we dont need to store much, we just want to map multiple keys to single value.
"_v" :"key:
}

GET spark/docs/_search
{
"query" : {
  "constant_score" : {
    "filter" : {
      "terms" : {
        "_k" : ["one"] // all values work.
        }
      }
    }
  }
}

より良い戦略があれば、提案してください。

以下のコードは機能していませんが、理論的には以下のようなことを達成しようとしています

  final Dataset<String> df = spark.read().csv("src/main/resources/star2000.csv").select("_c1").dropDuplicates().as(Encoders.STRING());
  final Dataset<ArrayList> arrayListDataset = df.mapPartitions(new MapPartitionsFunction<String, ArrayList>() {
        @Override
        public Iterator<ArrayList> call(Iterator<String> iterator) throws Exception {
            ArrayList<String> s = new ArrayList<>();
            iterator.forEachRemaining(it -> s.add(it));
            return Iterators.singletonIterator(s);
        }
    }, Encoders.javaSerialization(ArrayList.class));
  JavaEsSparkSQL.saveToEs(arrayListDataset,"spark/docs");

OOM が発生する可能性があるため、完全なデータセットを 1 つのリストに収集したくないため、パーティションごとにリストを取得し、パーティション キーに対してインデックスを作成することを計画しています。

4

2 に答える 2

0

使用しているソースコードを投稿すると役立ちますが、何を達成しようとしているのかについての質問も明確ではありません。

キー フィールド (_k) に配列をポストし、値フィールド (_v) に別の値をポストしたいと思いますか?

したがって、JavaPairRDD を作成し、それを Elasticsearch に保存することができます。次のようになります。

String[] keys = {"one", "two", "three"};
String value = "key";

List<Tuple2<String[],String>> l = new ArrayList<Tuple2<String[],String>>();
l.add(new Tuple2<String[],String>(keys, value));

JavaPairRDD<String[],String> R = ctx.parallelizePairs(l);

JavaEsSpark.saveToEs(R,"index/type");
于 2016-12-28T11:04:20.810 に答える