0

Elastic4s DSL を使用して、マルチレベルの集計クエリを動的に実行することは可能ですか?

httpクライアントを使用するのは簡単です

multiLevelAggregation

 入力: フィールド[0..N]
 出力: フィールド タプルでグループ化されたデータ

 手順:
 1. マルチレベルの Elasticsearch 集計クエリ (JSON) を構築する
 2.elasticsearch サーバーでクエリを実行する
 3.結果を平坦化して返す

しかし、elastic4s または Java クライアントを使用してこれを行う方法。

4

1 に答える 1

1

私の問題を注意深く理解した後、これに対する解決策を見つけることができました。最初は、これはelastic4sの制限であると考えていましたが、そうではなく、elastic4sクライアントを介して複数フィールドの集計クエリを動的に構築するのは簡単です。これが私の解決策です

//For building aggregation query
def buildAgg(groups: Seq[String])(leafAggBuilder: () => AbstractAggregationDefinition): AbstractAggregationDefinition = {
  groups match {
    case x :: xs => aggregation.terms("termAgg").field(x).aggregations(buildAgg(xs)(leafAggBuilder))
    case Nil => leafAggBuilder()
  }
}

//An example leaf aggregation builder
def buildLeafAgg(aggFuncInfo: Pair[String, String])(): AbstractAggregationDefinition = {
  aggFuncInfo._1 match {
    case "avg" => aggregation.avg("aggFunc").field(aggFuncInfo._2)
    case "sum" => aggregation.sum("aggFunc").field(aggFuncInfo._2)
    case "cardinality" => aggregation.cardinality("aggFunc").field(aggFuncInfo._2)
    case _ => aggregation.count("aggFunc").field(aggFuncInfo._2)
  }
}

//For parsing aggregation
def parseAgg[T](groups: Seq[String], agg: Aggregation, allGroups: Seq[String])(leafAggParser: (Seq[String], Aggregation) => Seq[T]): Seq[T] = {
  groups match {
    case x :: xs => {
      val groupAggs = agg.asInstanceOf[StringTerms].getBuckets.asScala.toList
      (for {
        groupAgg <- groupAggs
        aa = groupAgg.getAggregations.asList.asScala.head
        gkey = groupAgg.getKeyAsString
        gacc = allGroups :+ gkey
      } yield parseAgg(xs, aa, gacc)(leafAggParser)).flatten
    }

    case Nil => {
      leafAggParser(allGroups, agg)
    }
  }
}

//An example leaf aggregation parser
def parseSimpleLeafAgg(allGroups: Seq[String], agg: Aggregation): Seq[GroupStats] = {
  val value = agg.asInstanceOf[InternalNumericMetricsAggregation.SingleValue].value()
  val groupId = allGroups.mkString(".")
  Seq(GroupStats(groupId, value))
}

//Usage: Build Query and Parse result
def groupStats(groupFields: Seq[String]): Seq[GroupStats] = {
  val resp = client.execute {
    def leafPlainAggBuilder = buildLeafAgg(("count", "V1")) _
    search(esIndex).size(0).aggregations(buildAgg(groupFields)(leafPlainAggBuilder))
  }.await
  //get the root aggregation
  val agg = resp.aggregations.asList().asScala.head
  def leafAggParser = parseSimpleLeafAgg _
  val res = parseAgg(groupFields, agg, Seq())(leafAggParser)
  res
}

于 2016-09-09T14:29:04.903 に答える