Twitter分布式自增ID算法Snowflake

时间:2022-12-21 22:12:42

在分布式系统中,需要生成全局UID的场合还是比较多的,twitter的snowflake解决了这种需求,实现也还是很简单的,除去配置信息,核心代码就是毫秒级时间41位 机器ID 10位 毫秒内序列12位。

10---0000000000 0000000000 0000000000 0000000000 0 --- 00000 ---00000 ---000000000000
在上面的字符串中,第一位为未使用(实际上也可作为long的符号位),接下来的41位为毫秒级时间,然后5位datacenter标识位,5位机器ID(并不算标识符,实际是为线程标识),然后12位该毫秒内的当前毫秒内的计数,加起来刚好64位,为一个Long型。

这样的好处是,整体上按照时间自增排序,并且整个分布式系统内不会产生ID碰撞(由datacenter和机器ID作区分),并且效率较高,经测试,snowflake每秒能够产生26万ID左右,完全满足需要。

package com.twitter.service.snowflake

import com.twitter.ostrich.stats.Stats
import com.twitter.service.snowflake.gen._
import java.util.Random
import com.twitter.logging.Logger


class IdWorker(val workerId: Long, val datacenterId: Long, private val reporter: Reporter, var sequence: Long = 0L)
extends Snowflake.Iface {
private[this] def genCounter(agent: String) = {
Stats.incr("ids_generated")
Stats.incr("ids_generated_%s".format(agent))
}
private[this] val exceptionCounter = Stats.getCounter("exceptions")
private[this] val log = Logger.get
private[this] val rand = new Random

val twepoch = 1288834974657L

//机器标识位数

private[this] val workerIdBits = 5L

//数据中心标识位数
private[this] val datacenterIdBits = 5L

//机器ID最大值
private[this] val maxWorkerId = -1L ^ (-1L << workerIdBits)

//数据中心ID最大值
private[this] val maxDatacenterId = -1L ^ (-1L << datacenterIdBits)

//毫秒内自增位
private[this] val sequenceBits = 12L

//机器ID偏左移12位

private[this] val workerIdShift = sequenceBits

//数据中心ID左移17位
private[this] val datacenterIdShift = sequenceBits workerIdBits

//时间毫秒左移22位
private[this] val timestampLeftShift = sequenceBits workerIdBits datacenterIdBits
private[this] val sequenceMask = -1L ^ (-1L << sequenceBits)

private[this] var lastTimestamp = -1L

// sanity check for workerId
if (workerId > maxWorkerId || workerId < 0) {
exceptionCounter.incr(1)
throw new IllegalArgumentException("worker Id can't be greater than %d or less than 0".format(maxWorkerId))
}

if (datacenterId > maxDatacenterId || datacenterId < 0) {
exceptionCounter.incr(1)
throw new IllegalArgumentException("datacenter Id can't be greater than %d or less than 0".format(maxDatacenterId))
}

log.info("worker starting. timestamp left shift %d, datacenter id bits %d, worker id bits %d, sequence bits %d, workerid %d",
timestampLeftShift, datacenterIdBits, workerIdBits, sequenceBits, workerId)

def get_id(useragent: String): Long = {
if (!validUseragent(useragent)) {
exceptionCounter.incr(1)
throw new InvalidUserAgentError
}

val id = nextId()
genCounter(useragent)

reporter.report(new AuditLogEntry(id, useragent, rand.nextLong))
id
}

def get_worker_id(): Long = workerId
def get_datacenter_id(): Long = datacenterId
def get_timestamp() = System.currentTimeMillis

protected[snowflake] def nextId(): Long = synchronized {
var timestamp = timeGen()

//时间错误

if (timestamp < lastTimestamp) {
exceptionCounter.incr(1)
log.error("clock is moving backwards. Rejecting requests until %d.", lastTimestamp);
throw new InvalidSystemClock("Clock moved backwards. Refusing to generate id for %d milliseconds".format(
lastTimestamp - timestamp))
}

if (lastTimestamp == timestamp) {
//当前毫秒内,则 1
sequence = (sequence 1) & sequenceMask
if (sequence == 0) {
//当前毫秒内计数满了,则等待下一秒
timestamp = tilNextMillis(lastTimestamp)
}
} else {
sequence = 0
}

lastTimestamp = timestamp
//ID偏移组合生成最终的ID,并返回ID

((timestamp - twepoch) << timestampLeftShift) |
(datacenterId << datacenterIdShift) |
(workerId << workerIdShift) |
sequence
}

//等待下一个毫秒的到来

protected def tilNextMillis(lastTimestamp: Long): Long = {
var timestamp = timeGen()
while (timestamp <= lastTimestamp) {
timestamp = timeGen()
}
timestamp
}

protected def timeGen(): Long = System.currentTimeMillis()

val AgentParser = """([a-zA-Z][a-zA-Z\-0-9]*)""".r

def validUseragent(useragent: String): Boolean = useragent match {
case AgentParser(_) => true
case _ => false
}
}