Redis源码学习-Master&Slave的命令交互

时间:2022-09-23 13:58:12

0. 写在前面

Version Redis2.2.2

Redis中可以支持主从结构,本文主要从master和slave的心跳机制出发(PING),分析redis的命令行交互。

在Redis中,server为每个连接建立一个redisClient数据对象,来描述对应的连接。其中,redisClient为命令交互设置了缓冲区。querybuf用于存储客户端送过来的命令,buf和reply是用于应答的缓冲。querybuf是在文件事件readQueryFromClient中被填充,每次填充的最大字节数默认为1024B。而应答缓冲区是由addReply()函数填充,并由文件事件sendReplyToClient中发送给客户端。具体数据流如图1所示。MasterPorcess与SlaveProcess进行命令交互。其中,蓝色矩形框代表函数,白色矩形框代表数据,曲线描述数据流,折线描述数据间的从属关系。

Redis源码学习-Master&Slave的命令交互

图1. Master&Slave交互的数据流(蓝色矩形框代表函数,白色矩形框代表数据,曲线描述数据流,折线描述数据间的从属关系)

1. 相关数据结构

typedef struct redisClient {
int fd; //connect fd
...
sds querybuf; //命令缓冲区,由readQueryFromClient()事件进行填充(sds equals to char*)
int argc; //for command;记录参数个数
robj **argv; //for command;记录命令行参数
int reqtype; //命令解析协议:INLINE or MULTIBULK
...
time_t lastinteraction; /* 最近交互时间 */
...
list *reply; //Replay object list
/* Response buffer */
char buf[REDIS_REPLY_CHUNK_BYTES]; //Reply buffer,由addReply()函数进行填充
int bufpos; //记录buf已填充的长度
int sentlen; //Replay阶段,记录当前buf已发送了多少字节
} redisClient; struct redisServer {
...
list *clients;
dict *commands; /* Command table hahs table */
...
list *slaves, *monitors; //Master : slave链表
char neterr[ANET_ERR_LEN];
aeEventLoop *el; //Event list
int cronloops; //ServerCorn 执行次数
...
redisClient *master; //Slave :记录 master 的连接信息的client
int replstate; //Slave :当前的状态
...
}; struct redisCommand readonlyCommandTable[] = {
...
{"sync",syncCommand,1,0,NULL,0,0,0},
...
{"ping",pingCommand,1,0,NULL,0,0,0},
...
}

2. query的读取和命令的解析

从图1可以看出,命令交互数据query的读取是在文件事件readQueryFromClient中填充到c->querybuf中。之后,querybuf由函数processInputBuffer进行命令的解析。命令的解析过程如图2所示。在函数processInputBuffer中,将缓存与querybuf中的所有命令(命令间按\n\r分隔)进行解析。之后,查询命令hashtabe查找相关命令函数。最后调用相应命令hander执行命令。

Redis源码学习-Master&Slave的命令交互

图2.querybuf的解析

具体代码分析如下:

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
char buf[REDIS_IOBUF_LEN];
int nread;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask); nread = read(fd, buf, REDIS_IOBUF_LEN);
...check...
if (nread) {
c->querybuf = sdscatlen(c->querybuf,buf,nread);
c->lastinteraction = time(NULL);//更新时间戳
} else {
return;
}
processInputBuffer(c);//处理client传输过来的数据
} void processInputBuffer(redisClient *c) {
/* 执行querybub中的所有命令*/
while(sdslen(c->querybuf)) {
...check...
/*判定命令的解析协议 */
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
c->reqtype = REDIS_REQ_MULTIBULK;
} else {
c->reqtype = REDIS_REQ_INLINE;//按行解析
}
} if (c->reqtype == REDIS_REQ_INLINE) {
/*processInlineBuffer:
1. 取出c->querybuf起始端到\r\n位置的字符串,更新c->querybuf
2. 将取出的字符串按照“ ”空格进行分段解析,得到命令及其参数
格式为: argc,*argv[],其中argv[0]为命令,argv[1~argc-1]为参数*/
if (processInlineBuffer(c) != REDIS_OK) break;
} else if (c->reqtype == REDIS_REQ_MULTIBULK) {
...
} /* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
if (processCommand(c) == REDIS_OK) //执行命令
resetClient(c);
}
}
} /* If this function gets called we already read a whole
* command, argments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*/
int processCommand(redisClient *c) {
struct redisCommand *cmd; ... /* Now lookup the command and check ASAP about trivial error conditions
* such wrong arity, bad command name and so forth. */
cmd = lookupCommand(c->argv[0]->ptr); ...check... /* Exec the command */
if (c->flags & REDIS_MULTI &&
cmd->proc != execCommand && cmd->proc != discardCommand &&
cmd->proc != multiCommand && cmd->proc != watchCommand)
{
queueMultiCommand(c,cmd);
addReply(c,shared.queued);
} else {
if (server.vm_enabled && server.vm_max_threads > 0 &&
blockClientOnSwappedKeys(c,cmd))
return REDIS_ERR;
call(c,cmd); //执行命令
}
return REDIS_OK;
} /* Call() is the core of Redis execution of a command */
void call(redisClient *c, struct redisCommand *cmd) {
long long dirty; dirty = server.dirty;
cmd->proc(c); //执行命令
dirty = server.dirty-dirty; if (server.appendonly && dirty)
feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
if ((dirty || cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
listLength(server.slaves))
replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
if (listLength(server.monitors))
replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
server.stat_numcommands++;
}

3. 具体命令的执行(ping命令)

其中,addReply将相关命令执行结果放入client的reply缓冲区中。reply缓冲区的发送时机是在事件sendReplyToClient中进行。

#define REDIS_STRING 0
shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
//{"ping",pingCommand,1,0,NULL,0,0,0}
void pingCommand(redisClient *c) {
addReply(c,shared.pong); //ping的回复是pong,打乒乓,呵呵
} //将命令执行的返回结构写入c->buf 或者 c->reply
void addReply(redisClient *c, robj *obj) {
if (_installWriteEvent(c) != REDIS_OK) return;//创建event sendReplyToClient
redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY); /* This is an important place where we can avoid copy-on-write
* when there is a saving child running, avoiding touching the
* refcount field of the object if it's not needed.
*
* If the encoding is RAW and there is room in the static buffer
* we'll be able to send the object to the client without
* messing with its page. */
if (obj->encoding == REDIS_ENCODING_RAW) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
_addReplyObjectToList(c,obj);
} else {
/* FIXME: convert the long into string and use _addReplyToBuffer()
* instead of calling getDecodedObject. As this place in the
* code is too performance critical. */
obj = getDecodedObject(obj);
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
_addReplyObjectToList(c,obj);
decrRefCount(obj);
}
}

4. reply缓冲区数据的发送

将c->buf 和 c->reply中的数据发送到客户端(slave or master)。在每次文件事件中发送所有的reply缓冲区中的数据。

void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = privdata;
int nwritten = 0, totwritten = 0, objlen;
robj *o;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask); while(c->bufpos > 0 || listLength(c->reply)) {
if (c->bufpos > 0) {
//发送c->buf中的数据
if (c->flags & REDIS_MASTER) {
/* Don't reply to a master */
nwritten = c->bufpos - c->sentlen;
} else {
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
}
c->sentlen += nwritten;
totwritten += nwritten; /* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if (c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
//发送c->reply中的数据
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o->ptr); if (objlen == 0) {
listDelNode(c->reply,listFirst(c->reply));
continue;
} if (c->flags & REDIS_MASTER) {
/* Don't reply to a master */
nwritten = objlen - c->sentlen;
} else {
nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
if (nwritten <= 0) break;
}
c->sentlen += nwritten;
totwritten += nwritten; /* If we fully sent the object on head go to the next one */
if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
}
}
/* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
* super fast link that is always able to accept data (in real world
* scenario think about 'KEYS *' against the loopback interfae) */
if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
}
...check...
if (totwritten > 0) c->lastinteraction = time(NULL);
//??Why delete file event of write ? ?
if (listLength(c->reply) == 0) {
c->sentlen = 0;
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */
if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c);
}
}

5. 总结

命令行交互过程中,1.为每个连接有相应的数据进行描述(redisClient),这样便于连接的管理。2.命令行交互中,引入命令缓冲区querybuf,这样可以延时处理命令,这在事件轮询机制中,是至关重要的。

原文链接 http://blog.csdn.net/ordeder/article/details/16105345

Redis源码学习-Master&Slave的命令交互的更多相关文章

  1. Redis源码学习:字符串

    Redis源码学习:字符串 1.初识SDS 1.1 SDS定义 Redis定义了一个叫做sdshdr(SDS or simple dynamic string)的数据结构.SDS不仅用于 保存字符串, ...

  2. redis源码学习之slowlog

    目录 背景 环境说明 redis执行命令流程 记录slowlog源码分析 制造一条slowlog slowlog分析 1.slowlog如何开启 2.slowlog数量限制 3.slowlog中的耗时 ...

  3. &lowbar;&lowbar;sync&lowbar;fetch&lowbar;and&lowbar;add函数&lpar;Redis源码学习&rpar;

    __sync_fetch_and_add函数(Redis源码学习) 在学习redis-3.0源码中的sds文件时,看到里面有如下的C代码,之前从未接触过,所以为了全面学习redis源码,追根溯源,学习 ...

  4. Redis源码学习:Lua脚本

    Redis源码学习:Lua脚本 1.Sublime Text配置 我是在Win7下,用Sublime Text + Cygwin开发的,配置方法请参考<Sublime Text 3下C/C++开 ...

  5. 柔性数组&lpar;Redis源码学习&rpar;

    柔性数组(Redis源码学习) 1. 问题背景 在阅读Redis源码中的字符串有如下结构,在sizeof(struct sdshdr)得到结果为8,在后续内存申请和计算中也用到.其实在工作中有遇到过这 ...

  6. redis源码学习之工作流程初探

    目录 背景 环境准备 下载redis源码 下载Visual Studio Visual Studio打开redis源码 启动过程分析 调用关系图 事件循环分析 工作模型 代码分析 动画演示 网络模块 ...

  7. redis源码学习之lua执行原理

    聊聊redis执行lua原理 从一次面试场景说起   "看你简历上写的精通redis" "额,还可以啦" "那你说说redis执行lua脚本的原理&q ...

  8. Redis 源码学习之 Redis 事务Nosql

    Redis事务提供了一种将多个命令请求打包,然后一次性.按照顺序地执行多个命令的机制,并且在事务执行的期间,服务器不会中断事务而去执行其他不在事务中的命令请求,它会把事务中所有的命令都执行完毕才会去执 ...

  9. Redis源码解析:10scan类命令的实现

    像keys或者smembers命令,需要遍历数据集合中的所有元素.在一个大的数据库中使用,可能会阻塞服务器较长的一段时间,造成性能问题,因此不适用与生产环境. 在Redis2.8.0中引入了scan类 ...

随机推荐

  1. 一组神奇的 3D Gif 动图

    本文由 极客范 - 黄利民 翻译自 mymodernmet.欢迎加入极客翻译小组,同我们一道翻译与分享.转载请参见文章末尾处的要求. 虽然 gif 动图/动画似乎是无处不在现在了,但有些聪明人已经把 ...

  2. Extjs 数据代理

    Ext.data.proxy.Proxy 代理类的根类 客户端代理: 1.LocalStorageProxy:将数据存储在localStorage中,此种方式可以持久的将数据存储在客户端 要使用代理, ...

  3. 「mysql优化专题」什么是慢查询?如何通过慢查询日志优化?(10)

    日志就跟人们写的日记一样,记录着过往的事情.但是人的日记是主观的(记自己想记的内容),而数据库的日志是客观的,根据记录内容分为以下好几种日志(技术文): a.错误日志:记录启动.运行或停止mysqld ...

  4. linux下监控某个目录是否被更改

    需求:对一个目录(比如/data/test)进行监控,当这个目录下文件或子目录出现变动(如修改.创建.删除.更名等操作)时,就发送邮件!针对上面的需求,编写shell脚本如下: [root@cento ...

  5. OpenFace的一些了解

    1.OpenFace内4个样例代码 配置学习了两个 其一: Ubantu 基本命令 Docker 安装方式.发布网站方式.查看验证安装结果命令 Openface 基本demo 实现方式.和基本原理 其 ...

  6. Android 耳机插入过程分析

    Android 耳机插入过程分析 参考链接: https://www.jianshu.com/p/d82a8dabb3e7 初始化: 10-26 07:40:43.932 1414 1414 I Sy ...

  7. iOS线程开发小结

    在iOS开发线程操作时,一般方法名决定是否开启新线程(async,sync),队列类型(全局队列,串行队列)决定开启多少条线程 1.快速线程调用 *开启后台线程执行任务 [self performSe ...

  8. C语言高级-结构,共用体,文件,链表

    C语言结构 标准声明方式 struct student{        int age;        char sex;    }; 这个可以在main函数中定义:  struct student ...

  9. SAP RANG语法

    Range  和select-option 的变量是差不多的 sign = 'I' 或 sign = 'E' 是指INCLUED 和 EXCLUDE OPTION = 'EQ' 或其他操作符.. LO ...

  10. sscanf&comma;sprintf用法

    #include<string.h> #include<stdio.h> int main() { ],sztime1[],sztime2[]; sscanf("12 ...