一致性hash算法代码示例

时间:2022-10-30 09:49:38
package cn.de.common.dubbo;

import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.lang3.StringUtils;

import cn.de.AppConstant;
import cn.de.user.utils.MpqqTokenSecurityUtils;
import cn.de.user.utils.UserTokenSecurityUtils;
import cn.de.DataAccessReceiveService;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance;

/**
 * 根据用户Id一致性hash负载均衡,用户未登录则随机选择
 * <pre>
 * http://www.zsythink.net/archives/1182
 * https://www.cnblogs.com/xrq730/p/5186728.html
 * </pre>
 * 
 * @author mac
 *
 *         日期2017年5月12日
 */
public class UserIdConsistentHashLoadBalance extends AbstractLoadBalance {

	/** 选择器集合。key为方法全路径名称,value为调用者集合 */
	private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
	/** 随机数 */
	private final Random random = new Random();

	/** 
	 * 使用一致性哈希选择一个调用者进行调用
	 * <pre>
	 * invokers:
	 * 192.168.0.1:8001
	 * 192.168.0.2:8001
	 * 192.168.0.3:8001
	 * 
	 * url:
	 * cn.de.Bill.findBillById
	 * 
	 * key:
	 * userId
	 * 
	 * </pre>
	 * 
	 * @param invokers 某一接口的调用者集合
	 * @param url 接口
	 * @param invocation 
	 */
	@SuppressWarnings("unchecked")
	@Override
	protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
		String userAuthToken = null == attrs ? null : attrs.get(AppConstant.RPC_TOKEN_PARM);
		String reqIp = null == attrs ? null : attrs.get(AppConstant.RPC_REQ_IP);
		String hashKey = null;
		if (StringUtils.isNotEmpty(userAuthToken)) {
			try {
				hashKey = UserTokenSecurityUtils.getUserIdByTokenStr(userAuthToken);
			} catch (Exception e) {
			}
		}
		//如果用户id不存在则取请求ip
		if (null == hashKey) {
			hashKey = reqIp;
		}

		//如果未获取得到用户id,随机取一个调用者
		if (null == hashKey) {
			int length = invokers.size(); // 总个数
			return invokers.get(random.nextInt(length));
		} else {
			//cn.de.User:0.0.1.findUserById
			String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
			//生成一个哈希码
			int identityHashCode = System.identityHashCode(invokers);
			//获取某个方法的所有调用者
			ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
			//不存在,则初始化选择器集合,并选择一个调用者
			if (selector == null || selector.getIdentityHashCode() != identityHashCode) {
				//初始化选择器(哈希环)
				selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
				selector = (ConsistentHashSelector<T>) selectors.get(key);
			}
			//存在,则选择一个调用者
			return selector.select(hashKey);
		}
	}

	/**
	 * 一致性hash选择
	 * 
	 *
	 * 日期2017年5月12日
	 */
	private static final class ConsistentHashSelector<T> {

		/** 虚拟节点,key为哈希值,value为虚拟节点。解决哈希环倾斜问题,实现将n个节点尽可能多的、均匀的分布在哈希环上 */
		private final TreeMap<Long, Invoker<T>> virtualInvokers;

		/** 虚拟节点数 */
		private final int replicaNumber;

		/** 唯一HashCode */
		private final int identityHashCode;

		public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
			this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
			this.identityHashCode = System.identityHashCode(invokers);
			URL url = invokers.get(0).getUrl();
			/*
			 * 默认节点数不能过小也不能过大。
			 * 过小导致map中元素过少,根据hash值选择调用者时,选择同一个调用者概率较大,不能起到负载均衡效果;
			 * 过大导致map存储空间增大,且根据hash值选择调用者时,增加了将hash值与map中元素比较的次数;
			 * 
			 * 节点增加时,invokers增加,前n-1个调用者的hash值不变,第n个调用者的部分hash值添加到哈希环中,
			 * 如果请求不变,选择出的调用者可能变化,也可能不变。因为在哈希环中随机地添加了一些元素,再次查找比指定
			 * 哈希值大的元素时,查找结果可能变化,也可能不变。
			 * 
			 * 节点减少时,invokers减少,前n-1个调用者的hash值不变,第n个调用者的部分hash值从哈希环中移除,
			 * 如果请求不变,选择出的调用者可能变化,也可能不变。因为在哈希环中随机地移除了一些元素,再次查找比指定
			 * 哈希值大的元素时,查找结果可能变化,也可能不变。
			 */
			this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
			for (Invoker<T> invoker : invokers) {
				for (int i = 0; i < replicaNumber / 4; i++) {
					//进行一次md5编码,使value分布均匀
					byte[] digest = md5(invoker.getUrl().toFullString() + i);
					for (int h = 0; h < 4; h++) {
						//进行一次哈希编码,使value分布均匀
						long m = hash(digest, h);
						//key为0-max之间的随机的n个数
						virtualInvokers.put(m, invoker);
					}
				}
			}
		}

		public int getIdentityHashCode() {
			return identityHashCode;
		}

		/**
		 * 根据用户id一致性选择的核心算法
		 * 
		 * @param invocation
		 * @return
		 */
		public Invoker<T> select(String userId) {
			String key = userId;
			byte[] digest = md5(key);
			//哈希范围为0-max
			long hash = hash(digest, 0);
			Invoker<T> invoker = selectForKey(hash);
			return invoker;
		}

		/**
		 * 根据哈希值选择一个调用者
		 * 
		 * @param hash
		 * @return
		 */
		private Invoker<T> selectForKey(long hash) {
			Invoker<T> invoker;
			Long key = hash;
			//map中不存在指定的哈希,大概率是这样,因为map中元素个数很少,hash()生成的值范围很大
			if (!virtualInvokers.containsKey(key)) {
				//查找key值>=指定哈希的元素
				SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
				if (tailMap.isEmpty()) {
					//不存在,则取key值最小的元素。指定哈希大于map中最大的key时,出现该现象。形成一个哈希环
					key = virtualInvokers.firstKey();
				} else {
					//存在,取剩余元素的key最小的元素
					key = tailMap.firstKey();
				}
			}
			invoker = virtualInvokers.get(key);
			return invoker;
		}

		/**
		 * 计算哈希值,为0-max之间的一个数
		 * <pre>
		 * max为2^32-1 ?
		 * 不使用jdk提供的字符串的hashcode方法,其可能产生负数,且对于ip字符串,生成的hash值比较集中
		 * </pre>
		 * 
		 * @param digest
		 * @param number
		 * @return
		 */
		private long hash(byte[] digest, int number) {
			return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[0 + number * 4] & 0xFF)) & 0xFFFFFFFFL;
		}

		private byte[] md5(String value) {
			MessageDigest md5;
			try {
				md5 = MessageDigest.getInstance("MD5");
			} catch (NoSuchAlgorithmException e) {
				throw new IllegalStateException(e.getMessage(), e);
			}
			md5.reset();
			byte[] bytes = null;
			try {
				bytes = value.getBytes("UTF-8");
			} catch (UnsupportedEncodingException e) {
				throw new IllegalStateException(e.getMessage(), e);
			}
			md5.update(bytes);
			return md5.digest();
		}

	}

}

 

1 代码入口函数为doSelect,理解代码逻辑后可阅读类注释中的两篇博客,了解代码中未提到的一些问题。

2 代码为组内一个同事写的,其参考网上部分文章实现。