Nacos注册中心3-Cleint端(注册和心跳)

时间:2022-11-11 16:18:12

0\. 环境

  • nacos版本:1.4.1
  • Spring Cloud : 2020.0.2
  • Spring Boot :2.4.4
  • Spring Cloud alibaba: 2.2.5.RELEASE

测试代码:​​github.com/hsfxuebao/s…​

1\. 入口分析

​spring.factories​

Nacos注册中心3-Cleint端(注册和心跳)

其中,会注入​​NacosAutoServiceRegistration​​:

@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}

我们看一下 这个类的​​register()​​方法:

// com.alibaba.cloud.nacos.registry.NacosAutoServiceRegistration#register
@Override
protected void register() {
if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
log.debug("Registration disabled.");
return;
}
if (this.registration.getPort() < 0) {
this.registration.setPort(getPort().get());
}
super.register();
}

// org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#register
protected void register() {
this.serviceRegistry.register(getRegistration());
}

// com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register
@Override
public void register(Registration registration) {

if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}

NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();

Instance instance = getNacosInstanceFromRegistration(registration);

try {
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
// rethrow a RuntimeException if the registration is failed.
// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
}

核心方法​​namingService.registerInstance​​,接下来来到​​NacosNamingService#registerInstance​​方法中:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
// 格式为:groupId@@微服务名称
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 若当前实例为临时实例,则向Server发送心跳
if (instance.isEphemeral()) {
// 构建一个心跳信息实例
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// todo 向Server端发送心跳(定时任务)
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// todo 向Server发送注册请求
serverProxy.registerService(groupedServiceName, groupName, instance);
}

接下来,分别分析注册和心跳两个核心方法。

1\. 客户端注册

注册方法​​NamingProxy#registerService​​:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);

// 将instance拆开,写入请求参数param
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
// todo 发送注册请求
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}

首先,将一些核心参数设置到params中,后面在Server端会从params中解析这些参数。然后,调用​​reqApi方法​​,接下来,我们详细分析下这个方法,后面客户端向服务端发送请求都是使用的​​reqApi方法​​:

//com.alibaba.nacos.client.naming.net.NamingProxy#reqApi(java.lang.String, java.util.Map<java.lang.String,java.lang.String>, java.lang.String)
public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
// todo
return reqApi(api, params, Collections.EMPTY_MAP, method);
}

public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)
throws NacosException {
// todo
// getServerList() 配置文件中链接服务端的地址
return reqApi(api, params, body, getServerList(), method);
}

接下来:

public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
String method) throws NacosException {

params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}

NacosException exception = new NacosException();

if (StringUtils.isNotBlank(nacosDomain)) {
// 默认尝试连接3次
for (int i = 0; i < maxRetry; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
} else {
// 生成一个随机数,轮询
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());

// 遍历Server,从中选择一个Server进行连接,轮询
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
// todo 发送请求
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index + 1) % servers.size();
}
}
...
throw new NacosException(exception.getErrCode(),
"failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());

}

轮询向Server端发送请求,知道请求成功为止。

public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params);
Header header = builderHeader();

String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!IPUtil.containsPort(curServer)) {
curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
}

try {
// todo
HttpRestResult<String> restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();

MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
.observe(end - start);

if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}

public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues,
String httpMethod, Type responseType) throws Exception {
RequestHttpEntity requestHttpEntity = new RequestHttpEntity(
header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);
// todo
return execute(url, httpMethod, requestHttpEntity, responseType);
}

private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity,
Type responseType) throws Exception {
URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
...

ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
HttpClientResponse response = null;
try {
// todo 获取到nacos自定义httpClient,其实就是对JDK中httpClient的封装
response = this.requestClient().execute(uri, httpMethod, requestEntity);
return responseHandler.handle(response);
} finally {
if (response != null) {
response.close();
}
}
}

获取到​​nacos自定义httpClient​​,其实就是对JDK中httpClient的封装,然后发起请求。

注册请求:POST请求,url: /nacos/v1/ns/instance/

2\. 客户端心跳

// 若当前实例为临时实例,则向Server发送心跳
if (instance.isEphemeral()) {
// 构建一个心跳信息实例
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// todo 向Server端发送心跳(定时任务)
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}

构建​​BeatInfo​​,向Server端发送心跳请求:

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
// 格式为:groupId@@微服务名称#ip#port
// 这个key就固定一个主机
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
// dom2Beat为一个map,key为主机 value为该主机发送心跳beatInfo
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// todo 开启一个定时任务
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

创建定时任务,其实就是执行​​BeatTask​​的run方法:

class BeatTask implements Runnable {

BeatInfo beatInfo;

public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}

@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
try {
// todo 发送心跳
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
// 若在服务端没有发现这个Client,则Server端返回错误码为20404
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
// todo 向Server端发送注册请求
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());

}
// 设置一个新的定时任务
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}

核心方法​​sendBeat​​:

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {

if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
// 发送PUT请求
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}

心跳请求:PUT请求,url:/nacos/v1/ns/instance/beat

3\. 方法调用图

Nacos注册中心3-Cleint端(注册和心跳)