RestHighLevelClient进阶-聚合操作

时间:2024-03-12 11:07:04

之前的博客写过Spring与RestHighLevelClient的基础用法,但是实际使用中,会存在大量的复杂操作,如分组,聚合等。

接下来我们就来看下不太好用的分组聚合基本用法

AggregationBuilder
  • 首先RestHighLevelClient中分组需要用到AggregationBuilder,作为分组条件的构建。
    我们可以看到AggregationBuilder接口有很多实现类,比如AvgAggregationBuilder、MaxAggregationBuilder、SumAggregationBuilder等常用的统计函数。

在使用上还是和之前的大致一样,

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 以batchId为分组条件,terms为分组后的字段名称,field为将被分组的字段名称
TermsAggregationBuilder aggregation = AggregationBuilders.terms("batchId").field("batchId.keyword")
	// 分组求和integral字段,并将求和后的字段名改为score
	// subAggregation为子聚合,即在batchId分组后的小组内聚合
	.subAggregation(AggregationBuilders.sum("score").field("integral"))
	// 注意这里,下面介绍
	.subAggregation(AggregationBuilders.topHits("details").size(1));

sourceBuilder.aggregation(aggregation);

BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(boolBuilder);

SearchRequest searchRequest = new SearchRequest("table");
searchRequest.source(sourceBuilder);

SearchResponse search = client.search(searchRequest);
// 和之前不同的是这里需要getAggregations获取聚合后的数据
Aggregations aggregations = search.getAggregations();
// 从分组后的数据中拿到batchId的数据,这里以batchId分组,则分组后的数据都在batchId里
ParsedStringTerms terms = aggregations.get("batchId");
// 获取到分组后的所有bucket
List<? extends Terms.Bucket> buckets = terms.getBuckets();

for (Terms.Bucket bucket : buckets) {
	// 解析bucket 因为一级聚合为以batchId分组,二级聚合为求和,所以这里还需要getAggregations获取求和的数据
	Aggregations bucketAggregations = bucket.getAggregations();
	// 这里我是通过debug才找到返回的参数类型的,我不知道在哪找得到这个东西,所以我们拿到了ParsedTopHits,这里我们是取了一个,所以这个值的数组长度为1
	ParsedTopHits topHits = bucketAggregations.get("details");
	// 因为求和和下面的topHits都是AggregationBuilders.terms("batchId").field("batchId.keyword")的subAggreation,所以都属于batchId组内
	// 获取到求和的数据信息
	ParsedSum sum = bucketAggregations.get("integral");
	// 因为topHits中命中的hits肯定至少有一个,要不然也不会成组,所以这里直接获取第一个,并解析成map
	Map<String, Object> sourceAsMap = topHits.getHits().getHits()[0].getSourceAsMap();
	// 将求和后的integral覆盖到原数据中
	sourceAsMap.put("integral", sum.value());
	// 打印出统计后的数据
	System.out.println(sourceAsMap);
}

注意点:

  1. 为什么要用subAggregation(AggregationBuilders.topHits("details").size(1))?
  • 这里有个坑,如果只用batchId分组并用intrgal求和,那么返回值中只会返回batchId与integral求和后的值这两个字段。这里举个例子,现在统计全国所有学校的总人数,那么我们以school分组,求和了人数。但是我们仍然想知道每个学校属于哪个地区的,这里聚合后就不会返回这些信息,让人很难受。
  • 此时我们就可以使用subAggregation(AggregationBuilders.topHits("details").size(1))来获取一个通用的数据,拿到学校所属的地区名称或者其他学校的上级属性。
  1. AggregationBuilders层级
  • 我们注意到subAggregation方法是AggregationBuilder的,所以在构建AggregationBuilders的时候,每一个子AggregationBuilder都可以有子AggregationBuilder,如下
	TermsAggregationBuilder aggregation = AggregationBuilders.terms("batchId").field("batchId.keyword")
		.subAggregation(
				AggregationBuilders.terms("folder").field("folder")
						.subAggregation(AggregationBuilders.sum("integral").field("integral"))
	);
  • 这种就是组内再聚合套组内,但是得注意字段的问题,即若你以batchId分组后,那么组内只有batchId与其他的字段,就和第一个注意点一样,导致组内聚合拿不到相关的字段,导致报错。

结尾

  • 研究了半天最终还是没有用,因为太复杂。还不知道怎么同时以两个条件分组,类似于mysql的groupby a,b
  • 太难了

更新与2020/10/28 我胡汉三又回来啦

最终还是使用了ES的分组,上面的问题也得以解决。

多条件聚合
  • 上面我们说了聚合操作,但是结尾处说了不知道怎么同时以两个条件分组,类似于mysql的groupby a,b,下面我们来看下怎么以多字段分组;
Script script = new Script("doc[\'userId.keyword\'].values +\'#\'+ doc[\'taskId.keyword\'].values");
AggregationBuilder aggregation = AggregationBuilders.cardinality("user").script(script);
sourceBuilder.aggregation(aggregation);
  • 我们可以看到这里使用了script来解决了多字段分组聚合的问题,这里script的意思是以[userId]#[taskId]为唯一键来进行分组,当然这个script可以自己改,分隔符也不限定于#,我们可以改为"doc[\'userId.keyword\'].values +\'####\'+ doc[\'taskId.keyword\'].values + \'####\' doc[batchId.keyword].values"来完成三字段分组。

  • 这样我们从bucket中可以获取到key,则key的格式即为[userId]#[taskId],而getAggregations则为聚合的数据


更新与2020/10/29
  • 上面说到聚合,然后今天自信满满的进行了自测,发现分页出现了问题。 当时的写法是这样的:
	TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("user").script(script)
		.subAggregation(AggregationBuilders.sum("integral").field("integral"));
	SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
	sourceBuilder.aggregation(aggregation);
	sourceBuilder.from(30);
	sourceBuilder.size(10);

当我尝试变化from发现获取的值永远都是前十条。
查询各种网站后得出的结论也并不乐观,比如 https://blog.csdn.net/laoyang360/article/details/79112946

截取博客中一段话,如下:

  • 可以看到走正常的聚合是行不通的,于是看见了第三条,即此博主没有深入研究的,推测是不是有可能实现。即一次取全部的最大值的聚合,然后再聚合的数据里进行排序分页。
  • 最终代码如下:
	TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("user").script(script)
                .subAggregation(AggregationBuilders.sum("integral").field("integral"))
                .subAggregation(AggregationBuilders.topHits("sample").size(1)).size(9999);
        List<FieldSortBuilder> fieldSorts=new ArrayList<>();
        fieldSorts.add(new FieldSortBuilder("integral").order(SortOrder.DESC));
        aggregationBuilder.subAggregation(new BucketSortPipelineAggregationBuilder("bucket_field", fieldSorts).from(params.getOffset()).size(params.getPageSize()));

        Aggregations aggregations = IntegralQuery.aggregation(boolBuilder, aggregationBuilder);

不知道这样是不是上面截图中分区来取的意思,反正是实现了聚合分页。前提聚合后的.size需要设置大一些,否则下面排序分页会取不到。这种做法就是先进行聚合分组,然后再分组后bucket中进行排序分页。