四十九、进程间通信——System V IPC 之消息队列

时间:2022-04-02 22:19:02

49.1 System V IPC 介绍

49.1.1 System V IPC 概述

  • UNIX 系统存在信号、管道和命名管道等基本进程间通讯机制
  • System V 引入了三种高级进程间通信机制
    • 消息队列、共享内存和信号量
  • IPC 对象(消息队列、共享内存和信号量)存在于内核中而不是文件系统中,由用户控制释放(用户管理 ipc 对象的生命周期),不像管道的释放由内核控制
  • IPC 对象通过其标识符来引用和访问,所有 IPC 对象在内核空间中有唯一性标识 ID,在用户空间中的唯一性标识称为 key。
  • Linux IPC 继承自 System V IPC

49.1.2 System V IPC 对象的访问

  • IPC 对象是全局对象
    • 可用 ipcs, ipcm 等命令查看或者删除
  • 每个 IPC 对象都由 get 函数创建
    • msgger, shmget, semget
    • 调用 get 函数时必须指定关键字 key

49.1.3 IPC 对象的权限和所有者结构体

  四十九、进程间通信——System V IPC 之消息队列

49.1.4 消息队列

  • 消息队列是内核中的一个链表
  • 用户进程将数据传输到内核后,内核重新添加一些如用户 ID、组 ID、读写进程的 ID 和优先级等相关信息后,并打成一个数据包称为消息
  • 允许一个或者多个进程往消息队列中写消息和读消息,但一个消息只能被一个进程读取,读取完毕后就自动删除
  • 消息队列具有一定的 FIFO 的特性,消息可以按照顺序发送到队列中,也可以几种不同的方式从队列中读取。每一个消息队列在内核中用一个唯一的 IPC 标识 ID 表示
  • 消息队列的实现包括创建和打开队列、发送消息、读取消息和控制消息队列四种操作

49.1.5 消息队列的属性、打开、创建和控制

(1)消息队列属性

  四十九、进程间通信——System V IPC 之消息队列

(2)打开或创建消息队列

 #include <sys/msg.h>
int msgget(key_t key, int flag);
  • 函数参数:
    • key:用户指定的消息队列键值
    • flag:IPC_CREAT、IPC_EXCL 等权限组合
  • 返回值:
    • 成功,则返回内核中消息队列的标识 ID,出错,则返回 -1
  • 若创建消息队列,key 可指定键值,也可将之设置为 IPC_PRIVATE。若打开进行查询,则 key 不能为 0,必须是一个非零的值,否则是查询不到的。

(3)消息队列控制

  四十九、进程间通信——System V IPC 之消息队列

  • 函数参数:
    • msgid:消息队列 ID
    • buf:消息队列属性指针
    • cmd
      • IPC_STAT:获取消息队列的属性,取此队列的 msqid_ds 结构,并将其存放在 buf 指向的结构中
      • IPC_SET:设置属性,按由 buf 指向的结构中的值,设置与此队列相关的结构中的字段
      • IPC_RMID:删除队列,从系统中删除该消息队列以及仍在该队列上的所有数据

(4)消息队列的发送和接收

  四十九、进程间通信——System V IPC 之消息队列

  msgp 参数是一个指向 msgbuf 的指针

  四十九、进程间通信——System V IPC 之消息队列

  • 函数参数
    • msgid:消息队列 ID
    • msgq:最终要发送(接受)的数据

      • mtype:指消息的类型,它由一个整数来代表,并且它只能是大于 0 的整数;可以通过此数据接受特定的消息
      • mtext:消息数据本身,可以是文本类型,也可以是二进制等
      • 在 Linux 中,消息的最大长度是 4096 个字节,其中包括 mtype,它占有 4 个字节
      • 结构体 msgbuf 用户可自定义,但是第一个成员必须是 mtype
    • msgsz:指定消息的大小,不包括 mtype 的大小,仅是消息的大小
      • msgsz = sizeof(struct mymesg) - sizeof(long)
    • msgtyp:消息类型
      • msgtyp = 0:获得消息队列中第一个消息
      • msgtyp > 0:获得消息队列中类型为 type 的第一个消息
      • msgtyp < 0:获得消息队列中小于或等于 msgtyp 绝对值的消息(类型最小的)
    • msgflag:
      • 0:阻塞
      • IPC_NOWAIT:类似于文件 I/O 的非阻塞 I/O 标识
      • 若消息队列已满(或者是队列中的消息总数等于系统限制值,或队列中的字节总数等于系统限制值),则指定 IPC_NOWAIT 使得 msgsnd 立即出错返回 EAGAIN。如果指定0,则进程:
        • 阻塞直到有空间可以容纳要发送的消息
        • 或从系统中删除了此队列
        • 或捕捉到一个信号,并从信号处理程序返回

49.2 例子

49.2.1 消息队列发送

  msg_snd.c

 #include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/msg.h> typedef struct {
long type;///< 消息数据
int start;///< 消息数据本身(包括start 和 end)
int end;
}MSG; /** 往消息队列中发送消息 */
int main(int argc, char *argv[])
{
if(argc < ){
printf("usage: %s key\n", argv[]);
return ;
} key_t key = atoi(argv[]);
printf("key: %d\n", key); /** 创建消息队列 */
int msg_id;
if((msg_id = msgget(key, IPC_CREAT | IPC_EXCL | )) < ){
perror("msgget error");
} printf("msg id: %d\n", msg_id); /** 定义要发送的消息 */
MSG m1 = {, , };
MSG m2 = {, , };
MSG m3 = {, , };
MSG m4 = {, , };
MSG m5 = {, , }; /** 发送消息到消息队列 */
if(msgsnd(msg_id, &m1, sizeof(MSG) - sizeof(long), IPC_NOWAIT) < ){
perror("msgsnd error");
} if(msgsnd(msg_id, &m2, sizeof(MSG) - sizeof(long), IPC_NOWAIT) < ){
perror("msgsnd error");
} if(msgsnd(msg_id, &m3, sizeof(MSG) - sizeof(long), IPC_NOWAIT) < ){
perror("msgsnd error");
} if(msgsnd(msg_id, &m4, sizeof(MSG) - sizeof(long), IPC_NOWAIT) < ){
perror("msgsnd error");
} if(msgsnd(msg_id, &m5, sizeof(MSG) - sizeof(long), IPC_NOWAIT) < ){
perror("msgsnd error");
} /** 发送后去获得消息队列中消息的总数 */
struct msqid_ds ds;
if(msgctl(msg_id, IPC_STAT, &ds) < ){
perror("msgctl error");
} printf("msg total: %ld\n", ds.msg_qnum); return ;
}

  编译运行:

  四十九、进程间通信——System V IPC 之消息队列

  四十九、进程间通信——System V IPC 之消息队列

  删除消息队列:

  四十九、进程间通信——System V IPC 之消息队列

49.2.2 消息队列接收

  msg_rcv.c

 #include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/msg.h> typedef struct {
long type;
int start;
int end;
}MSG; int main(int argc, char *argv[])
{
if(argc < ){
printf("usage: %s key type\n", argv[]);
return ;
} int key = atoi(argv[]);
long type = atoi(argv[]); /** 获得指定的消息队列 */
int msg_id;
if((msg_id = msgget(key, )) < ){
perror("msgget error");
}
printf("msg id: %d\n", msg_id); /** 从消息队列中接收指定类型的消息 */
MSG m;
if(msgrcv(msg_id, &m, sizeof(MSG) - sizeof(long), type, IPC_NOWAIT) < ){
perror("msgrcv error");
}
else{
printf("type: %ld start: %d end: %d\n", m.type, m.start, m.end);
} return ;
}

  编译后,与 msg_snd 联调

  四十九、进程间通信——System V IPC 之消息队列

  四十九、进程间通信——System V IPC 之消息队列

  再次接收 6 的时候,已经没有消息了

  四十九、进程间通信——System V IPC 之消息队列

  消息队列还存在,但是消息都已经被接受,接受一条消息后,内核就会自动删除那条消息