Elasticsearch 2.0以上版本根据条件批量删除Java如何实现

时间:2025-04-28 07:07:51

Elasticsearch在2.0以前版本,删除操作有两种方式,一种是通过id来进行删除,但是这种方式一般不常用,因为id不容易得到;另一种方式是通过先查询操作,然后删除,也就是通过这种方式来根据条件批量删除数据:

DeleteByQueryResponse response = ("library")
.setQuery(("title", "ElasticSearch"))
.execute().actionGet();

但是Delete by Query在2.0版本及其以上的版本已经被移除了,因为这种方式会自动强制刷新,所以在大量索引并发的情况下,会很快造成内存溢出。

详情可查看:/guide/en/elasticsearch/client/java-api/1.7/

那么在2.0以后的版本,我们如何来进行批量的删除呢?

我们可以先通过Search API查询,然后得到需要删除的批量数据的id,然后再通过id来删除,但是这种方式在大批量数据的删除的时候,依然是行不通的。

具体实现代码:

	public void deleteByTerm(Client client){
		BulkRequestBuilder bulkRequest = ();
		SearchResponse response = ("megacorp").setTypes("employee")
				.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
				.setQuery(("first_name", "xiaoming"))
				.setFrom(0).setSize(20).setExplain(true).execute().actionGet();
		for(SearchHit hit : ()){
			String id = ();
			(("megacorp", "employee", id).request());
		}
		BulkResponse bulkResponse = ();
		if (()) {
			for(BulkItemResponse item : ()){
				(());
			}
		}else {
			("delete ok");
		}
		
	}

同样通过delete-by-query插件,我们还可以根据type来批量删除数据,这种方式能够删除大批量的数据,他是现将要删除的数据一个一个做标记,然后再删除,于是效率会比较低。下面是官网的说明:/guide/en/elasticsearch/plugins/2.3/

Queries which match large numbers of documents may run for a long time, as every document has to be deleted individually. Don’t use delete-by-query to clean out all or most documents in an index. Rather create a new index and perhaps reindex the documents you want to keep. 

可见这种删除方式并不适合大批量数据的删除,因为效率真的是很低,我是亲身体验过了。


这种方式需要先引入delete-by-query插件包,然后使用插件的api来删除:

		<dependency>
			<groupId></groupId>
			<artifactId>delete-by-query</artifactId>
			<version>2.3.2</version>
		</dependency>


具体实现代码:

import ;
import ;
import ;
import ;

import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import org.;
import org.;

import ;

public class EsDeleteByType {

	private static final Logger logger = ();
	private Client client;
	
	private static ResourceBundle getEsConfig(){
		return ("elasticsearch");
	}
	
	private void getClient(){
		String clusterName = getEsConfig().getString("clusterName");
		String hosts = getEsConfig().getString("hosts");
		if (hosts == null || clusterName == null) {
			throw new IllegalArgumentException("hosts or clusterName was null.");
		}
		Settings settings = ().put("", clusterName).build();
		client = ()
				.addPlugin()
				.settings(settings).build();
		String[] hostsArray = (",");
		for(String hostAndPort : hostsArray){
			String[] tmpArray = (":");
			try {
				client = ((TransportClient)client).addTransportAddress(new InetSocketTransportAddress((tmpArray[0]), (tmpArray[1])));
			} catch (NumberFormatException e) {
				((e));
			} catch (UnknownHostException e) {
				((e));
			}
		}
	}
	
	/**
	 * 判断一个index中的type是否有数据
	 * @param index
	 * @param type
	 * @return
	 * @throws Exception
	 */
	public Boolean existDocOfType(String index, String type) throws Exception {
		SearchRequestBuilder builder = (index).setTypes(type)
				.setSearchType(SearchType.QUERY_THEN_FETCH)
				.setSize(1);
		SearchResponse response = ().actionGet();
		long docNum = ().getTotalHits();
		if (docNum == 0) {
			return false;
		}
		return true;
	}

	/**
	 * 根据type来删除数据
	 * @param index
	 * @param types
	 * @return
	 */
	public long deleteDocByType(String index, String[] types) {
		getClient();
		long oldTime = ();
		StringBuilder b = new StringBuilder();
		("{\"query\":{\"match_all\":{}}}");
		DeleteByQueryResponse response = new DeleteByQueryRequestBuilder(client, )
		.setIndices(index).setTypes(types)
		.setSource(())
		.execute().actionGet();
		Stack<String> allTypes = new Stack<String>();
		for(String type : types){
			(type);
		}
		while(!()){
			String type = ();
			while(true){
				try {
					if (existDocOfType(index, type) == false) {
						break;
					}
				} catch (Exception e) {
					("queryError: " + ());
				}
			}
		}
		(() - oldTime);
		return ();
	}
}



那么当我们在开发中,使用到elasticsearch的时候,总会涉及到大批量数据的删除,我们要怎么办呢?

经过很长时间的纠结,我发现使用elasticsearch存储数据的时候,千万不要把所有数据都存储于一个index,这样一个是不利于查询的效率,一个是不利于后面的删除,既然我们不能index中去删除部分的大批量数据,那么我们为啥不改变一种思路呢,就是分索引,然后通过索引来删除数据,例如:我在生产上面,每天有5亿的数据,那么我每天在集群中生成一个index用于存储这5亿的数据,如果我们的elasticsearch集群对数据只要求保存7天的数据,超过7天的数据就可以删除了,这样我们可以通过index直接删除7天以前的数据,这种方式,我们在查询的时候不会在所有数据中查询,只需要在所要查询的时间段内查询,便提高了查询的效率,同时删除效率的问题也解决了,能够很快删除不需要的数据,释放掉磁盘空间。

针对于elasticsearch大批量数据删除效率的问题,目前官网上面也没有一个特别好的解决办法,这种方式算是目前还算能行得通的方式了。