spring boot 集成 elasticsearch 、x-pack 权限

时间:2025-04-25 08:15:59
package com.****.****.config; import com.****.****.common.utils.StringUtils; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import java.net.InetAddress; import java.net.UnknownHostException; /** *TransportClientFactoryBean * *@author liuf */ @Configuration public class ElasticsearchConfiguration implements FactoryBean<TransportClient>, InitializingBean, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(ElasticsearchConfiguration.class); @Value("${-nodes}") private String clusterNodes ; @Value("${-name}") private String clusterName; @Value("${-password}") private String usernamePassword; /** * 超时时间 */ @Value("${.connect_timeout}") private String connectTimeout; /** * 等待来自节点的ping响应的时间。默认值为5s。 */ @Value("${.ping_timeout}") private String pingTimeout; /** * 采样/ ping列出和连接的节点的频率。默认值为5s */ @Value("${.nodes_sampler_interval}") private String nodesSamplerInterval; private TransportClient client; @Override public void destroy() throws Exception { try { logger.info("Closing elasticSearch client"); if (client != null) { client.close(); } } catch (final Exception e) { logger.error("Error closing ElasticSearch client: ", e); } } @Override public TransportClient getObject() throws Exception { return client; } @Override public Class<TransportClient> getObjectType() { return TransportClient.class; } @Override public boolean isSingleton() { return false; } @Override public void afterPropertiesSet() throws Exception { buildClient(); } protected void buildClient() { try { //x-pack权限方法 if (StringUtils.isNotBlank(usernamePassword)) { PreBuiltXPackTransportClient preBuiltTransportClient = new PreBuiltXPackTransportClient(settings()); if (!"".equals(clusterNodes)) { for (String nodes : clusterNodes.split(",")) { String InetSocket[] = nodes.split(":"); String Address = InetSocket[0]; Integer port = Integer.valueOf(InetSocket[1]); preBuiltTransportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(Address), port)); } client = preBuiltTransportClient; // ().getThreadContext().putHeader("Authorization", "Basic "+ Encodes.decodeBase64(usernamePassword)); } } else { //没有x-pack权限访问 PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(settings()); if (!"".equals(clusterNodes)) { for (String nodes:clusterNodes.split(",")) { String InetSocket [] = nodes.split(":"); String Address = InetSocket[0]; Integer port = Integer.valueOf(InetSocket[1]); preBuiltTransportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(Address),port )); } client = preBuiltTransportClient; } } } catch (UnknownHostException e) { logger.error(e.getMessage()); } } private Settings settings(){ Settings.Builder settings = Settings.builder(); settings.put("",clusterName) // .put(".ignore_cluster_name", false) // .put("", true) // .put("", true) .put(".nodes_sampler_interval", nodesSamplerInterval) // elasticSearch 健康检查时间 .put(".connect_timeout", connectTimeout) // elasticSearch 超时时间 .put(".ping_timeout",pingTimeout); //elasticSearch ping //x-pack权限方法 if (StringUtils.isNotBlank(usernamePassword)) { settings.put("", usernamePassword); } else { } return settings.build(); } }