3--消息队列(报文队列)实践到内核--消息的接收

时间:2022-06-11 17:23:06
我是无名小卒,一直想写一些关于内核方面的资料,学习内核很久了,市面上的内核书我都读过了,无法对任何一本书加以总结,因为他就象linux的内核一样在不断更新和升级,针对2.6内核现在市面上非常缺少相关内核的分析资料情况,当然,也有不少网友写了一些关于2.6内核的博客文章,我也看过,但是写的不够深刻具体,总是在内核的过程上粗略的一笔带过,因此我下决心只要有空闲时间就写一些日志来与大家分享,很多书籍和博客都是直接剖析内核的,我读过这些书后发现,这种学习方法虽然快,但是效果不太好,特别是我读了几遍后还是对很多知识点和结构记忆不深,对初学者来说更是枯燥无味的过程,使很多朋友放弃了研读内核代码的兴趣,确实如此,就象缺一张好的导游地图一样,如果我们目的明确,但是缺乏了指路的地图那无非是在大森林里迷了路一样,再高的旅游兴趣也荡然无存,因此,我想要是有一本书或者资料能够在实践中逐步深入到内核该有多好,那样能够使我们在即看到效果的时候导游到如何产生这样效果的内核中将会是一件非常有趣的事情,肯定能够轻松地掌握全部想要的知识,目前市面上有这样的书,但是评价不怎么好,所以我想根据多年读内核的书来整理和书写这类的文章,希望对有兴趣的朋友起到导游地图的效果。这些文章有可能是来自大家所熟悉的资料中也有可能来自互联网,总之,多多益善,我不会在文中注明具体出处,也希望原作者勿怪,我们的目标是大家共同进步。 请注意本文使用的是基于2.6.26的内核,这是个人认为应该稳定使用的版本。
 
接着前面的章节,我们今天追踪消息是如何接收的
msgrcv(msgid,(void*)&some_data,BUFSIZ,msg_to_receive,0);
 
不明白的朋友请看前边章节,我们就不重复 sys_ipc()函数了,忘记的朋友可以返回前二节看一下,我们直接从里面的一句关键代码开始
 

    case MSGRCV:
        switch (version) {
        case 0: {
            struct ipc_kludge tmp;
            if (!ptr)
                return -EINVAL;
            
            if (copy_from_user(&tmp,
                     (struct ipc_kludge __user *) ptr,
                     sizeof (tmp)))
                return -EFAULT;
            return sys_msgrcv (first, tmp.msgp, second,
                     tmp.msgtyp, third);
        }
        default:
            return sys_msgrcv (first,
                     (struct msgbuf __user *) ptr,
                     second, fifth, third);
        }

可以看出嵌套了一个swith语句,这里多了一个数据结构ipc_kludge,我们呆会探讨,整段代码可以看到最关键的函数是sys_msgrcv ()函数,我们跟进看一下

asmlinkage long sys_msgrcv(int msqid, struct msgbuf __user *msgp, size_t msgsz,
             long msgtyp, int msgflg)
{
    long err, mtype;

    err = do_msgrcv(msqid, &mtype, msgp->mtext, msgsz, msgtyp, msgflg);
    if (err < 0)
        goto out;

    if (put_user(mtype, &msgp->mtype))
        err = -EFAULT;
out:
    return err;
}

上面我们提到一个数据结构

struct ipc_kludge {
    struct msgbuf __user *msgp;
    long msgtyp;
};

这个数据结构其实是专门封装我们前二节看到的消息缓存结构msgbuf用的,只不过多了一个msgtyp即消息的类型。我们看到sys_msgrcv()内部调用了另一个函数do_msgrcv()函数,我是无名小卒,和我一起跟进他

long do_msgrcv(int msqid, long *pmtype, void __user *mtext,
        size_t msgsz, long msgtyp, int msgflg)
{
    struct msg_queue *msq;
    struct msg_msg *msg;
    int mode;
    struct ipc_namespace *ns;

    if (msqid < 0 || (long) msgsz < 0)
        return -EINVAL;
    mode = convert_mode(&msgtyp, msgflg);
    ns = current->nsproxy->ipc_ns;

    msq = msg_lock_check(ns, msqid);
    if (IS_ERR(msq))
        return PTR_ERR(msq);

我们看到好几个熟悉的结构和函数了,这些前面都介绍了,如果还不能理解可以看看前二节,我们只在这里假定已经找到了消息队列msq,继续往下看

for (;;) {
        struct msg_receiver msr_d;
        struct list_head *tmp;

        msg = ERR_PTR(-EACCES);
        if (ipcperms(&msq->q_perm, S_IRUGO))
            goto out_unlock;

        msg = ERR_PTR(-EAGAIN);
        tmp = msq->q_messages.next;
        while (tmp != &msq->q_messages) {
            struct msg_msg *walk_msg;

            walk_msg = list_entry(tmp, struct msg_msg, m_list);
            if (testmsg(walk_msg, msgtyp, mode) &&
             !security_msg_queue_msgrcv(msq, walk_msg, current,
                         msgtyp, mode)) {

                msg = walk_msg;
                if (mode == SEARCH_LESSEQUAL &&
                        walk_msg->m_type != 1) {
                    msg = walk_msg;
                    msgtyp = walk_msg->m_type - 1;
                } else {
                    msg = walk_msg;
                    break;
                }
            }
            tmp = tmp->next;
        }
        if (!IS_ERR(msg)) {
            /*
             * Found a suitable message.
             * Unlink it from the queue.
             */

            if ((msgsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) {
                msg = ERR_PTR(-E2BIG);
                goto out_unlock;
            }
            list_del(&msg->m_list);
            msq->q_qnum--;
            msq->q_rtime = get_seconds();
            msq->q_lrpid = task_tgid_vnr(current);
            msq->q_cbytes -= msg->m_ts;
            atomic_sub(msg->m_ts, &ns->msg_bytes);
            atomic_dec(&ns->msg_hdrs);
            ss_wakeup(&msq->q_senders, 0);
            msg_unlock(msq);
            break;
        }
        /* No message waiting. Wait for a message */
        if (msgflg & IPC_NOWAIT) {
            msg = ERR_PTR(-ENOMSG);
            goto out_unlock;
        }
        list_add_tail(&msr_d.r_list, &msq->q_receivers);
        msr_d.r_tsk = current;
        msr_d.r_msgtype = msgtyp;
        msr_d.r_mode = mode;
        if (msgflg & MSG_NOERROR)
            msr_d.r_maxsize = INT_MAX;
        else
            msr_d.r_maxsize = msgsz;
        msr_d.r_msg = ERR_PTR(-EAGAIN);
        current->state = TASK_INTERRUPTIBLE;
        msg_unlock(msq);

        schedule();

        /* Lockless receive, part 1:
         * Disable preemption. We don't hold a reference to the queue
         * and getting a reference would defeat the idea of a lockless
         * operation, thus the code relies on rcu to guarantee the
         * existance of msq:
         * Prior to destruction, expunge_all(-EIRDM) changes r_msg.
         * Thus if r_msg is -EAGAIN, then the queue not yet destroyed.
         * rcu_read_lock() prevents preemption between reading r_msg
         * and the spin_lock() inside ipc_lock_by_ptr().
         */

        rcu_read_lock();

        /* Lockless receive, part 2:
         * Wait until pipelined_send or expunge_all are outside of
         * wake_up_process(). There is a race with exit(), see
         * ipc/mqueue.c for the details.
         */

        msg = (struct msg_msg*)msr_d.r_msg;
        while (msg == NULL) {
            cpu_relax();
            msg = (struct msg_msg *)msr_d.r_msg;
        }

        /* Lockless receive, part 3:
         * If there is a message or an error then accept it without
         * locking.
         */

        if (msg != ERR_PTR(-EAGAIN)) {
            rcu_read_unlock();
            break;
        }

        /* Lockless receive, part 3:
         * Acquire the queue spinlock.
         */

        ipc_lock_by_ptr(&msq->q_perm);
        rcu_read_unlock();

        /* Lockless receive, part 4:
         * Repeat test after acquiring the spinlock.
         */

        msg = (struct msg_msg*)msr_d.r_msg;
        if (msg != ERR_PTR(-EAGAIN))
            goto out_unlock;

        list_del(&msr_d.r_list);
        if (signal_pending(current)) {
            msg = ERR_PTR(-ERESTARTNOHAND);
out_unlock:
            msg_unlock(msq);
            break;
        }
    }
    if (IS_ERR(msg))
        return PTR_ERR(msg);

    msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz;
    *pmtype = msg->m_type;
    if (store_msg(mtext, msg, msgsz))
        msgsz = -EFAULT;

    free_msg(msg);

    return msgsz;
}

很大的一个for循环,表面看起来很复杂,其实没有那么可怕,解读一下,首先是检查当前进程对队列的操作权限,然后从消息队列中的等待接收的消息头指针开始,逐个比对要接收的消息是否是符合要求的类型和模式,我们来看一下这个关键比对函数testmsg()

static int testmsg(struct msg_msg *msg, long type, int mode)
{
    switch(mode)
    {
        case SEARCH_ANY:
            return 1;
        case SEARCH_LESSEQUAL:
            if (msg->m_type <=type)
                return 1;
            break;
        case SEARCH_EQUAL:
            if (msg->m_type == type)
                return 1;
            break;
        case SEARCH_NOTEQUAL:
            if (msg->m_type != type)
                return 1;
            break;
    }
    return 0;
}

回到上面的函数,只要有符合要求的消息,就把msg指向这个消息,然后对这个消息进行长度符合检查,也就是应用程序是否提供的缓存长度可以保存这个消息,如果可以满足空间要求就要通过

list_del(&msg->m_list);

将消息从队列中摘链,这个函数我们不做介绍了,朋友们可以自己阅读。

既然已经取得了想要的消息,队列中就腾出空间所以唤醒ss_wakeup()发送进程

static void ss_wakeup(struct list_head *h, int kill)
{
    struct list_head *tmp;

    tmp = h->next;
    while (tmp != h) {
        struct msg_sender *mss;

        mss = list_entry(tmp, struct msg_sender, list);
        tmp = tmp->next;
        if (kill)
            mss->list.next = NULL;
        wake_up_process(mss->tsk);
    }

注意一下msg_sender摘链后并没有释放,是因为他在函数内部分配的是局部变量,函数退出后系统会自动释放局部变量所占用的空间。此时顺利接收到消息后就会通过break跳出for循环到达最后面的部分,我们再贴一下代码看看

    if (IS_ERR(msg))
        return PTR_ERR(msg);

    msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz;
    *pmtype = msg->m_type;
    if (store_msg(mtext, msg, msgsz))
        msgsz = -EFAULT;

    free_msg(msg);

    return msgsz;

消息是如何传送到用户空间的呢?是store_msg()这个函数来完成的,传送完毕后还要将消息占用的空间释放掉。这是通过free_msg()函数完成的。

假如没有消息在队列中呢?会执行这段代码

        /* No message waiting. Wait for a message */
        if (msgflg & IPC_NOWAIT) {
            msg = ERR_PTR(-ENOMSG);
            goto out_unlock;
        }
        list_add_tail(&msr_d.r_list, &msq->q_receivers);
        msr_d.r_tsk = current;
        msr_d.r_msgtype = msgtyp;
        msr_d.r_mode = mode;
        if (msgflg & MSG_NOERROR)
            msr_d.r_maxsize = INT_MAX;
        else
            msr_d.r_maxsize = msgsz;
        msr_d.r_msg = ERR_PTR(-EAGAIN);
        current->state = TASK_INTERRUPTIBLE;
        msg_unlock(msq);

        schedule();

如果没有设置等待标记IPC_NOWAIT,那么就返回了不会往下继续执行了,如果允许等待就会把代表当前进程的struct msg_receiver队列中的链接头挂入消息队列的q_receivers队列,这个数据与前一节发送进程的msg_sender是一样的,然后设置一下这个结构的内容并将当前进程设置为中断状态之后,进行一次调试,我们前面提到过如果发送进程发送消息到队列后会唤醒这个接收进程,或者是直接走近道不用将消息挂入队列而是直接通过pipelined_send()函数转交给接收进程,当执行到schedule()下边的代码时说明我们的接收进程是被发送进程唤醒后,我们看一下后面有一句

msg = (struct msg_msg*)msr_d.r_msg;

这里是从代表进程的结构中取得消息,为什么会接着有这句代码呢?

        if (msg != ERR_PTR(-EAGAIN)) {
            rcu_read_unlock();
            break;
        }

我们看一下pipelined_send()前一节函数的代码就知道了,他是为了传输出错的原因表示码,例如这里是指明的是检查他是不是-EAGAIN,这是在前边接收进程睡眠之间设置的

msr_d.r_msg = ERR_PTR(-EAGAIN);

这里的意思也就是说不是这个出错码就说明成功接收到消息,所以释放前边加的rcu锁。最后看一下还是否有信号等待处理,没有的话就执行到后边把消息复制给应用程序。struct msg_receiver msr_d是在for循环中声明的,它也是局部变量手动释放空间。