一致性hash算法简介与代码实现

时间:2022-10-23 09:48:39

一.简介:

一致性hash算法提出了在动态变化的Cache环境中,判定哈希算法好坏的四个定义:

1、平衡性(Balance)

2、单调性(Monotonicity)

3、分散性(Spread)

4、负载(Load)

普通的哈希算法(也称硬哈希)采用简单取模的方式,将机器进行散列,这在cache环境不变的情况下能取得让人满意的结果,但是当cache环境动态变化时,这种静态取模的方式显然就不满足单调性的要求(当增加或减少一台机子时,几乎所有的存储内容都要被重新散列到别的缓冲区中)。

一致性哈希算法的基本实现原理是将机器节点和key值都按照一样的hash算法映射到一个0~2^32的圆环上。当有一个写入缓存的请求到来时,计算Key值k对应的哈希值Hash(k),如果该值正好对应之前某个机器节点的Hash值,则直接写入该机器节点,如果没有对应的机器节点,则顺时针查找下一个节点,进行写入,如果超过2^32还没找到对应节点,则从0开始查找(因为是环状结构)。

实现代码1:

首先有一个设备类,定义了机器名和ip:

一致性hash算法简介与代码实现一致性hash算法简介与代码实现
public class Cache
{
    public String name;
    public String ipAddress;
}
View Code

主要的实现:

一致性hash算法简介与代码实现一致性hash算法简介与代码实现
public class Shard<T> { 
    //hash 算法并不是保证绝对的平衡,如果 cache 较少的话,对象并不能被均匀的映射到 cache 上,
    //所以增加虚拟节点
    private TreeMap<Long, T> nodes;
    private List<T> shards; //节点碎片
    private final int NODE_NUM = 10; // 每个机器节点关联的虚拟节点个数

    public Shard(List<T> shards) {
        this.shards = shards;
        init();
    }

    private void init() { 
        nodes = new TreeMap<Long, T>();
        for (int i = 0; i < shards.size(); i++) 
        { // 遍历真实节点
            final T shardInfo = shards.get(i);
            
            for (int n = 0; n < NODE_NUM; n++)
            {
                // 真实节点关联虚拟节点,真实节点是VALUE;
                nodes.put((long) Hash("SHARD-" + i + "-NODE-" + n), shardInfo);
            }
            System.out.println(shardInfo);
        }
    }

    public T getShardInfo(String key) {
        SortedMap<Long, T> tail = nodes.tailMap((long) Hash(key)); 
        if (tail.size() == 0) {
            return nodes.get(nodes.firstKey());
        }
        //找到最近的虚拟节点
        return tail.get(tail.firstKey());
    }
    
    /**
     * 改进的32位FNV算法,高离散
     * 
     * @param string
     *            字符串
     * @return int值
     */
    public static int Hash(String str) 
    {
        final int p = 16777619;
        int hash = (int) 2166136261L;
        for (byte b : str.getBytes())
            hash = (hash ^ b) * p;
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;
        return hash;
    }

}
View Code

测试:

一致性hash算法简介与代码实现一致性hash算法简介与代码实现
public class Test
{

    /**
     * @param args
     */
    public static void main(String[] args)
    {
        List<Cache> myCaches=new ArrayList<Cache>();
        Cache cache1=new Cache();
        cache1.name="COMPUTER1";
        Cache cache2=new Cache();
        cache2.name="COMPUTER2";
        myCaches.add(cache1);
        myCaches.add(cache2);
        
        
        Shard<Cache> myShard=new Shard<Cache>(myCaches);
        
        Cache currentCache=myShard.getShardInfo("info1");
        System.out.println(currentCache.name);
        
//        for(int i=0;i<20;i++)
//        {
//            String object=getRandomString(20);//产生20位长度的随机字符串
//            Cache currentCache=myShard.getShardInfo(object);
//            System.out.println(currentCache.name);
//        }
        
        
    }
    
    public static String getRandomString(int length) { //length表示生成字符串的长度
        String base = "abcdefghijklmnopqrstuvwxyz0123456789";   
        Random random = new Random();   
        StringBuffer sb = new StringBuffer();   
        for (int i = 0; i < length; i++) {   
            int number = random.nextInt(base.length());   
            sb.append(base.charAt(number));   
        }   
        return sb.toString();   
     }   

}
View Code

上述实现略为简单,32FNV算法的一种改进

FNV哈希算法是一种高离散性的哈希算法,特别适用于哈希非常相似的字符串,例如:URL,IP,主机名,文件名等。

以下服务使用了FNV:
1、calc
2、DNS
3、mdbm key/value查询函数
4、数据库索引hash
5、主流web查询/索引引擎
6、高性能email服务
7、基于消息ID查询函数
8、auti-spam反垃圾邮件过滤器
9、NFS实现(比如freebsd 4.3, linux NFS v4)
10、Cohesia MASS project 
11、Ada 95的spellchecker
12、开源x86汇编器:flatassembler user-defined symbol hashtree
13、PowerBASIC
14、PS2、XBOX上的文本资源
15、非加密图形文件指纹
16、FRET
17、Symbian DASM
18、VC++ 2005的hash_map实现
19、memcache中的libketama
20、 PHP 5.x 
21、twitter中用于改进cache碎片
22、BSD IDE project
23、deliantra game server
24、 Leprechaun
25、IPv6流标签

实现代码2:

HashFunction:

一致性hash算法简介与代码实现一致性hash算法简介与代码实现
package ha;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

public class HashFunction {
    private MessageDigest md5 = null;

    public long hash(String key) {
        if (md5 == null) {
            try {
                md5 = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException e) {
                throw new IllegalStateException("no md5 algorythm found");
            }
        }

        md5.reset();
        md5.update(key.getBytes());
        byte[] bKey = md5.digest();
        long res = ((long) (bKey[3] & 0xFF) << 24) | ((long) (bKey[2] & 0xFF) << 16) | ((long) (bKey[1] & 0xFF) << 8)
                | (long) (bKey[0] & 0xFF);
        return res & 0xffffffffL;
    }

}
View Code

主要实现:

一致性hash算法简介与代码实现一致性hash算法简介与代码实现
package ha;

import java.util.Collection;
import java.util.SortedMap;
import java.util.TreeMap;

public class ConsistentHash<T> {

    private final HashFunction hashFunction;
    private final int numberOfReplicas;  // 虚拟节点
    private final SortedMap<Long, T> circle = new TreeMap<Long, T>();   // 用来存储虚拟节点hash值 到真实node的映射

    public ConsistentHash(HashFunction hashFunction, int numberOfReplicas, Collection<T> nodes) {
        this.hashFunction = hashFunction;
        this.numberOfReplicas = numberOfReplicas;

        for (T node : nodes) {
            add(node);
        }
    }

    public void add(T node) {
        for (int i = 0; i < numberOfReplicas; i++) {
            circle.put(hashFunction.hash(node.toString() + i), node); 
        }
    }

    public void remove(T node) {
        for (int i = 0; i < numberOfReplicas; i++) {
            circle.remove(hashFunction.hash(node.toString() + i));
        }
    }

    /**
     * 获得一个最近的顺时针节点
     * @param key 为给定键取Hash,取得顺时针方向上最近的一个虚拟节点对应的实际节点
     * @return
     */
    public T get(Object key) {
        if (circle.isEmpty()) {
            return null;
        }
        long hash = hashFunction.hash((String) key);
        if (!circle.containsKey(hash)) {
            SortedMap<Long, T> tailMap = circle.tailMap(hash); ////返回此映射的部分视图,其键大于等于 hash
            hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
        }
        return circle.get(hash);
    }
    
    public long getSize() {
        return circle.size();
    }
    
}
View Code

测试:

一致性hash算法简介与代码实现一致性hash算法简介与代码实现
package ha;

import ha2.Cache;

import java.util.HashSet;
import java.util.Random;
import java.util.Set;

public class MainApp {

    public static void main(String[] args) {

        Set<String> nodes = new HashSet<String>();
        nodes.add("127.0.0.1");
        nodes.add("192.144.111");
        nodes.add("127.0.0.3");
        ConsistentHash<String> consistentHash = new ConsistentHash<String>(new HashFunction(), 160, nodes);

        consistentHash.add("127.0.0.4");
//        consistentHash.add("E");
//        consistentHash.add("F");
//        consistentHash.add("G");
//        consistentHash.add("H");
//        consistentHash.add("I");
//        consistentHash.add("J");
//        consistentHash.add("K");
//        consistentHash.add("L");
//        consistentHash.add("M");
//        consistentHash.add("N");
//        consistentHash.add("O");
//        consistentHash.add("P");
//        consistentHash.add("R");
//        consistentHash.add("S");
//        consistentHash.add("T");
//        consistentHash.add("U");
//        consistentHash.add("V");
//        consistentHash.add("W");
        System.out.println(consistentHash.getSize());  //640

        System.out.println(consistentHash.get("127.0.0.1#111123"));
        System.out.println(consistentHash.get("127.0.0.33#111123"));//ip=>tcp
        
        Random random = new Random();

        int s = random.nextInt(100)%(100-0+1) + 0;
        System.out.println(s);
        
        int t = random.nextInt()*100;//key
        System.out.println(consistentHash.get(t+"#"));
        
        for(int i=0;i<20;i++)
        {
            String object = getRandomString(20);//产生20位长度的随机字符串
            System.out.println(consistentHash.get(object));
        }
        
    }

    public static String getRandomString(int length) { //length表示生成字符串的长度
        String base = "abcdefghijklmnopqrstuvwxyz0123456789";   
        Random random = new Random();   
        StringBuffer sb = new StringBuffer();   
        for (int i = 0; i < length; i++) {   
            int number = random.nextInt(base.length());   
            sb.append(base.charAt(number));   
        }   
        return sb.toString();   
     }  
    
}
View Code

实现代码3:

 KetamaNodeLocator:

一致性hash算法简介与代码实现一致性hash算法简介与代码实现
/**
 * 在这个环中,节点之间是存在顺序关系的,
 * 所以TreeMap的key必须实现Comparator接口
 * @author 5  */
public final class KetamaNodeLocator {

    private TreeMap<Long, Node> ketamaNodes;  // 记录所有虚拟服务器节点,为什么是Long类型,因为Long实现了Comparable接口
    private HashAlgorithm hashAlg;
    private int numReps = 160;    // 每个服务器节点生成的虚拟服务器节点数量,默认设置为160

    public KetamaNodeLocator(List<Node> nodes, HashAlgorithm alg, int nodeCopies) {
        hashAlg = alg;
        ketamaNodes = new TreeMap<Long, Node>();

        numReps = nodeCopies;

        // 对所有节点,生成numReps个虚拟结点
        for (Node node : nodes) {
            // 每四个虚拟结点为一组,为什么这样?下面会说到
            for (int i = 0; i < numReps / 4; i++) {
                // 为这组虚拟结点得到惟一名称
                byte[] digest = hashAlg.computeMd5(node.getName() + i);
                /**
                 * Md5是一个16字节长度的数组,将16字节的数组每四个字节一组,
                 * 分别对应一个虚拟结点,这就是为什么上面把虚拟结点四个划分一组的原因
                 */
                for (int h = 0; h < 4; h++) {
                    // 对于每四个字节,组成一个long值数值,做为这个虚拟节点的在环中的惟一key
                    long m = hashAlg.hash(digest, h);

                    ketamaNodes.put(m, node);
                }
            }
        }
    }

    /**
     * 根据一个key值在Hash环上顺时针寻找一个最近的虚拟服务器节点
     * @param k
     * @return
     */
    public Node getPrimary(final String k) {
        byte[] digest = hashAlg.computeMd5(k);
        Node rv = getNodeForKey(hashAlg.hash(digest, 0));  // 为什么是0?猜测:0、1、2、3都可以,但是要固定
        return rv;
    }

    Node getNodeForKey(long hash) {
        final Node rv;
        Long key = hash;
        //如果找到这个节点,直接取节点,返回
        if (!ketamaNodes.containsKey(key)) {
            //得到大于当前key的那个子Map,然后从中取出第一个key,就是大于且离它最近的那个key  
            SortedMap<Long, Node> tailMap = ketamaNodes.tailMap(key);
            if (tailMap.isEmpty()) {
                key = ketamaNodes.firstKey();
            } else {
                key = tailMap.firstKey();
            }
            // For JDK1.6 version
            // key = ketamaNodes.ceilingKey(key);
            // if (key == null) {
            // key = ketamaNodes.firstKey();
            // }
        }

        rv = ketamaNodes.get(key);
        return rv;
    }
}
View Code

HashAlgorithm:

一致性hash算法简介与代码实现一致性hash算法简介与代码实现
/**
 * hash算法,通过MD5算法实现
 * MD5算法根据key生成一个16字节的序列,我们将其切成4段,将其中一段作为得到的Hash值
 * 在生成虚拟服务器节点中,我们将这四段分别作为四个虚拟服务器节点的唯一标识,即四个hash值
 * @author XXX
 */
public enum HashAlgorithm {

    /**
     * MD5-based hash algorithm used by ketama.
     */
    KETAMA_HASH;

    public long hash(byte[] digest, int nTime) {
        long rv = ((long) (digest[3+nTime*4] & 0xFF) << 24)
                | ((long) (digest[2+nTime*4] & 0xFF) << 16)
                | ((long) (digest[1+nTime*4] & 0xFF) << 8)
                | (digest[0+nTime*4] & 0xFF);
        
        /**
         * 实际我们只需要后32位即可,为什么返回一个long类型?
         * 因为Long实现了Comparable接口
         * Hash环上的节点之间是存在顺序关系的,必须实现Comparable接口
         */
        return rv & 0xffffffffL; /* Truncate to 32-bits */
    }

    /**
     * Get the md5 of the given key.
     */
    public byte[] computeMd5(String k) {
        MessageDigest md5;
        try {
            md5 = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("MD5 not supported", e);
        }
        md5.reset();
        byte[] keyBytes = null;
        try {
            keyBytes = k.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException("Unknown string :" + k, e);
        }
        
        md5.update(keyBytes);
        return md5.digest();
    }
}
View Code

测试:

一致性hash算法简介与代码实现一致性hash算法简介与代码实现
/**
 * 分布平均性测试
 * @author  4  */
public class HashAlgorithmTest {

    static Random ran = new Random();
    
    // key的数量,key在实际客户端中是根据要存储的值产生的hash序列?
    private static final Integer EXE_TIMES = 100000;
    // 服务器节点的数量
    private static final Integer NODE_COUNT = 5;
    // 每个服务器节点生成的虚拟节点数量
    private static final Integer VIRTUAL_NODE_COUNT = 160;
    
    /**
     * 模拟EXE_TIMES个客户端数据存储时选择缓存服务器的情况,
     * 得到每个服务器节点所存储的值的数量,从而计算出值在服务器节点的分布情况
     * 判断该算法的"性能",正常情况下要求均匀分布
     * @param args
     */
    public static void main(String[] args) {
        HashAlgorithmTest test = new HashAlgorithmTest();
        
        // 记录每个服务器节点所分布到的key节点数量
        Map<Node, Integer> nodeRecord = new HashMap<Node, Integer>();
        
        // 模拟生成NODE_COUNT个服务器节点
        List<Node> allNodes = test.getNodes(NODE_COUNT);
        // 将服务器节点根据Hash算法扩展成VIRTUAL_NODE_COUNT个虚拟节点布局到Hash环上(实际上是一棵搜索树)
        // 由KetamaNodeLocator类实现和记录
        KetamaNodeLocator locator = 
                new KetamaNodeLocator(allNodes, HashAlgorithm.KETAMA_HASH, VIRTUAL_NODE_COUNT);
        
        // 模拟生成随机的key值(由长度50以内的字符组成)
        List<String> allKeys = test.getAllStrings();
        for (String key : allKeys) {
            // 根据key在Hash环上找到相应的服务器节点node
            Node node = locator.getPrimary(key);
            
            // 记录每个服务器节点分布到的数据个数
            Integer times = nodeRecord.get(node);
            if (times == null) {
                nodeRecord.put(node, 1);
            } else {
                nodeRecord.put(node, times + 1);
            }
        }
        
        // 打印分布情况
        System.out.println("Nodes count : " + NODE_COUNT + ", Keys count : " + EXE_TIMES + ", Normal percent : " + (float) 100 / NODE_COUNT + "%");
        System.out.println("-------------------- boundary  ----------------------");
        for (Map.Entry<Node, Integer> entry : nodeRecord.entrySet()) {
            System.out.println("Node name :" + entry.getKey() + " - Times : " + entry.getValue() + " - Percent : " + (float)entry.getValue() / EXE_TIMES * 100 + "%");
        }
        
    }
    
    
    /**
     * Gets the mock node by the material parameter
     * 
     * @param nodeCount 
     *         the count of node wanted
     * @return
     *         the node list
     */
    private List<Node> getNodes(int nodeCount) {
        List<Node> nodes = new ArrayList<Node>();
        
        for (int k = 1; k <= nodeCount; k++) {
            Node node = new Node("node" + k);
            nodes.add(node);
        }
        
        return nodes;
    }
    
    /**
     *    All the keys    
     */
    private List<String> getAllStrings() {
        List<String> allStrings = new ArrayList<String>(EXE_TIMES);
        
        for (int i = 0; i < EXE_TIMES; i++) {
            allStrings.add(generateRandomString(ran.nextInt(50)));
        }
        
        return allStrings;
    }
    
    /**
     * To generate the random string by the random algorithm
     * <br>
     * The char between 32 and 127 is normal char
     * 
     * @param length
     * @return
     */
    private String generateRandomString(int length) {
        StringBuffer sb = new StringBuffer(length);
        
        for (int i = 0; i < length; i++) {
            sb.append((char) (ran.nextInt(95) + 32));
        }
        
        return sb.toString();
    }
}
View Code

 HashAlgorithmPercentTest

一致性hash算法简介与代码实现一致性hash算法简介与代码实现
package ConsistentHashing;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

/**
 * ½ÚµãÔöɾ²âÊÔ
 * @author Lenovo
 */
public class HashAlgorithmPercentTest {

    static Random ran = new Random();
    
    /** key's count */
    private static final Integer EXE_TIMES = 100000;
    
    private static final Integer NODE_COUNT = 50;
    
    private static final Integer VIRTUAL_NODE_COUNT = 160;
    
    static List<String> allKeys = null;
    
    static {
        allKeys = getAllStrings();
    }
    
    public static void main(String[] args) {
        
        Map<String, List<Node>> map = generateRecord();
        
        List<Node> allNodes = getNodes(NODE_COUNT);
        System.out.println("Normal case : nodes count : " + allNodes.size());
        call(allNodes, map);
        
        allNodes = getNodes(NODE_COUNT + 8);
        System.out.println("Added case : nodes count : " + allNodes.size());
        call(allNodes, map);
        
        allNodes = getNodes(NODE_COUNT - 10);
        System.out.println("Reduced case : nodes count : " + allNodes.size());
        call(allNodes, map);
        
        int addCount = 0;
        int reduceCount = 0;
        for (Map.Entry<String, List<Node>> entry : map.entrySet()) {
            List<Node> list = entry.getValue();
            
            if (list.size() == 3) {
                if (list.get(0).equals(list.get(1))) {
                    addCount++;
                }
                
                if (list.get(0).equals(list.get(2))) {
                    reduceCount++;
                }
            } else {
                System.out.println("It's wrong size of list, key is " + entry.getKey() + ", size is " + list.size());
            }
        }
        
        System.out.println(addCount + "   ---   " + reduceCount);
        
        System.out.println("Same percent in added case : " + (float) addCount * 100/ EXE_TIMES + "%");
        System.out.println("Same percent in reduced case : " + (float) reduceCount * 100/ EXE_TIMES + "%");
        
    }
    
    private static void call(List<Node> nodes, Map<String, List<Node>> map) {
        KetamaNodeLocator locator = new KetamaNodeLocator(nodes, HashAlgorithm.KETAMA_HASH, VIRTUAL_NODE_COUNT);
        
        for (Map.Entry<String, List<Node>> entry : map.entrySet()) {
            Node node = locator.getPrimary(entry.getKey());
            
            if (node != null) {
                List<Node> list = entry.getValue();
                list.add(node);
            }
        }
    }
    
    private static Map<String, List<Node>> generateRecord() {
        Map<String, List<Node>> record = new HashMap<String, List<Node>>(EXE_TIMES);
        
        for (String key : allKeys) {
            List<Node> list = record.get(key);
            if (list == null) {
                list = new ArrayList<Node>();
                record.put(key, list);
            }
        }
        
        return record;
    }
    
    
    /**
     * Gets the mock node by the material parameter
     * 
     * @param nodeCount 
     *         the count of node wanted
     * @return
     *         the node list
     */
    private static List<Node> getNodes(int nodeCount) {
        List<Node> nodes = new ArrayList<Node>();
        
        for (int k = 1; k <= nodeCount; k++) {
            Node node = new Node("node" + k);
            nodes.add(node);
        }
        
        return nodes;
    }
    
    /**
     *    All the keys    
     */
    private static List<String> getAllStrings() {
        List<String> allStrings = new ArrayList<String>(EXE_TIMES);
        
        for (int i = 0; i < EXE_TIMES; i++) {
            allStrings.add(generateRandomString(ran.nextInt(50)));
        }
        
        return allStrings;
    }
    
    /**
     * To generate the random string by the random algorithm
     * <br>
     * The char between 32 and 127 is normal char
     * 
     * @param length
     * @return
     */
    private static String generateRandomString(int length) {
        StringBuffer sb = new StringBuffer(length);
        
        for (int i = 0; i < length; i++) {
            sb.append((char) (ran.nextInt(95) + 32));
        }
        
        return sb.toString();
    }
}
View Code

参考:

https://blog.helong.info/blog/2015/03/13/jump_consistent_hash/

http://blog.codinglabs.org/articles/consistent-hashing.html