RabbitMQ——消息存储

时间:2024-03-22 22:21:28

【概述】

前一篇文章中提到了消息可存储在队列索引或消息存储中,对于消息存储的方式,整体框架大概如下图所示:

RabbitMQ——消息存储

rabbitmq启动后针对每个vhost会启动两个进程:msg_store_persistent和msg_store_transient,这两个进程作为服务端负责将消息写入文件,从文件读取消息。其中msg_store_persistent负责将持久化消息写入文件与从文件中读取消息,msg_store_transient负责非持久化消息写入文件与从文件中读取消息。

每个队列则看成是一个客户端,当生产者发送的消息达到队列时,向服务端请求写;当消费者在队列进行消息消费时,直接从文件读取或向服务端请求读取消息内容。

另外,msg_store_persistent与msg_store_transient均有一个对应的GC进程,负责进行文件的合并与删除,减少磁盘空间的占用。


【ETS表】

rabbitmq内部维护了多张表,这些表有的是记录消息与存储文件的相关信息:例如消息存储在哪个文件中、在文件中的偏移位置、消息的长度、引用次数、总共有多少个文件、文件中有多少有效消息、左右关联的文件信息等;有的则是用于操作记录、消息缓存等,以便于减少对文件的实际读写操作。具体的表有:

1)flying_ets:用于消息write、remove的引用计数

RabbitMQ——消息存储

CRef:客户端对应的reference,每个客户端唯一

MsgId:消息的唯一ID

Count:引用计数

2)cur_file_cache_ets:用于当前正在写的文件的消息缓存

RabbitMQ——消息存储

MsgId:消息的唯一ID

Msg:消息内容

Count:消息的引用计数

3)msg_store_ets_index:消息在文件中的索引信息

RabbitMQ——消息存储

MsgId:消息的唯一ID

RefCount:消息的计数

File:消息存储的文件名

Offset:消息在文件中的偏移

TotalSize:消息的长度

4)file_summary_ets:文件的描述信息

RabbitMQ——消息存储

File:存储的文件名

ValidTotalSize:存储文件的有效数据大小

Left:位于该文件左边的文件

Right:位于该文件右边的文件

FileSize:文件总的大小

Locked:文件锁定标记,文件合并或删除前会进行锁定

Readers:当前正在读该文件的客户端个数


【重要流程】

1) 消息的写流程

RabbitMQ——消息存储

2)消息的删除流程

RabbitMQ——消息存储

3)消息的读流程

RabbitMQ——消息存储

上面仅描述了每个操作的关键流程,但实际实现中有很多细节处理,以达到最优效果。例如:如果同一客户端对一条消息先后进行写、删除操作,虽然会先后向服务端发送两个请求,但可能服务端在处理写请求时,客户端已经完成删除操作,此时flying_ets表中对应消息的引用计数为0,那么服务端对该写请求也不会进行实际处理。

rabbitmq充分利用了前面提到的几个ets表进行了读写操作的优化处理,但也有需要注意的地方:当前正在写的文件,对应存储的消息是会缓存在cur_file_cache_ets表中,当前写的文件关闭后,缓存表中的数据也随之清除。对于非正在写的文件中的消息的读操作,需要打开消息所存储的文件,然后seek到指定位置并读取对应长度的内容,并且读取后的消息是不会在任何地方进行缓存的。那么极端情况下,如果不同客户端先后来读同一条消息,会重复的进行读操作(即重复的打开这个文件,seek到指定位置,然后读取指定长度的内容,最终关闭该文件)。


【文件格式&文件合并】

消息存储对应的文件后缀名为rdq,文件名从0开始递增,文件的内部格式是这样的:

RabbitMQ——消息存储

同一条消息只会存储一次,通过msg_store_ets_index表来记录被引用了多少次,每个文件的信息则记录在file_summary_ets表中。

当消息被删除时,并不会直接删除文件中的内容,仅仅是在msg_store_ets_index中删除对应的记录,同时更新文件的相关信息。

服务端每处理完一个请求后,检查是否符合合并文件的条件,如果符合条件则先对该文件标记为锁定,然后通知GC进程进行文件合并处理,具体为先对左边文件中的有效数据进行整理,再将右边文件中的有效数据写入到左边文件,最后更新相关的表信息并删除右边的文件。

RabbitMQ——消息存储