阻塞I/O(等待队列)

时间:2022-06-24 21:06:16

    在实际驱动程序中,我们经常会遇到这种情景:当我们进行读写操作时,设备或数据还没准备就绪时,我们应该提供一种策略来把当前进程让出CPU从而避免资源上的浪费.当设备或数据就绪时再唤醒这个进程,从而使得系统资源的优化利用.这种策略,就是睡眠和唤醒.


1.睡眠与唤醒:

    当进程进入睡眠状态后,将从调度器的运行队列上移除,只有等待某时机唤醒再投放到调度器的运行队列上参与竞争得到运行.

 

    总得来说,进程进入睡眠一定要注意其安全性.因此,要遵守下面的原则:

1).原子上下文中不能睡眠,具体而言,就是持有自旋锁、seqlock锁或者RCU锁时不能睡眠.试想一下,你占住了马桶拉屎,然后睡着了,让门外的人情可以堪呢?
2).如果存在多个进程共同等待同一资源时,醒来后一定要检查实时性;
3).当然,一定要存在一个唤醒的机制.睡眠,又不是死亡.

    进程进入睡眠其中必须的一步就是让出CPU,这由函数schedule()来实现.实现阻塞IO最常用的手段就是借助内核的等待队列.


2.等待队列:

    等待队列是LINUX内核提供的实现进程阻塞而实现系统资源有效利用的一种策略.它提供支持的功能必须包含让进程睡眠、在合适的时机唤醒进程这两个动作.具体操作包括定义、初始化、添加、移除、等待事件(导致当前进程进入睡眠)、唤醒.   


    2-1.定义及初始化:

    LINUX中,一个等待队列通过一个"等待队列头(wait queue head)"来管理,其类型为wait_queue_head_t的结构体.其定义包括静态和动态两种方法:

    静态方法:

DECLARE_WAIT_QUEUE_HEAD(name);
   生成一个等待队列头,名字为name.

    动态方法:

wait_queue_head_t my_queue;
init_waitqueue_head(&my_queue);
    实际使用中,更多的是我们使用动态的方法.

    2-2.添加/移除等待队列:

extern void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait);
    将等待队列wait添加到等待队列头q指向的等待队列链表中.

extern void remove_wait_queue(wait_queue_head_t *q, wait_queue_t *wait);
    将等待队列wait从附属的等待队列头q指向的等待队列链表中移除.


    2-3.等待事件:

    这是等待队列的核心部分,下面函数的调用将导致当前进程进入睡眠模式:

wait_event(wq, condition)
    wq是指某等待队列的队列头,condition是是否唤醒的条件,为真即唤醒,为假则继续睡眠.

wait_event_interruptible(wq, condition)
    参数同上.可被中断打断.

wait_event_interruptible_timeout(wq, condition, timeout)
    超时返回,而不管condition是否被满足,timeout以jiffy为单位.

wait_event_interruptible_timeout(wq, condition, timeout)
    参数同上.可被中断打断.


    2-4.唤醒:

void wake_up(wait_queue_head_t *queue);
    唤醒所有隶属于这个队列头的等待队列.

void wake_up_interruptible(wait_queue_head_t *queue);
    参数同上.但是当一个等待队列头下面有多个等待队列时,它只唤醒被标识为TASK_INTERRUPTIBLE的进程.这算是一种优化.


3.等待队列头与等待队列:

    我们知道,等待队列头是一个"头目",它用来管理隶属于它的等待队列.但是在一些实际驱动编程中,我们搞来搞去貌似都只是操作等待队列头,并没有出现我们一直唠叨的等待队列.那么,这个等待队列何去何从?跟我们当前这个进程有啥关系呢?下面以函数wait_event_interruptible()来分析其逻辑关系:

#define wait_event_interruptible(wq, condition)             \
({ \
int __ret = 0; \
if (!(condition)) \
__wait_event_interruptible(wq, condition, __ret); \
__ret; \
})
    展开宏 __wait_event_interruptible():

#define __wait_event_interruptible(wq, condition, ret)          \
do { \
DEFINE_WAIT(__wait); \
\
for (;;) { \
prepare_to_wait(&wq, &__wait, TASK_INTERRUPTIBLE); \
if (condition) \
break; \
if (!signal_pending(current)) { \
schedule(); \
continue; \
} \
ret = -ERESTARTSYS; \
break; \
} \
finish_wait(&wq, &__wait); \
} while (0)
    其中等待队列__wait出现了,它是通过宏DEFINE_WAIT()自动生成的,而不是手动干预.它是和我们当前这个进程息息相关的,如下:

#define DEFINE_WAIT(name)  DEFINE_WAIT_FUNC(name, autoremove_wake_function) 

#define DEFINE_WAIT_FUNC(name, function) \

wait_queue_t name = { \

.private = current, \

.func = function, \

.task_list = LIST_HEAD_INIT((name).task_list), \

}

    可见,__wait的private存放了current,即当前进程的信息.

    接下来看函数prepare_to_wait():

void prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)

{

  unsigned long flags;

  wait->flags &= ~WQ_FLAG_EXCLUSIVE;

  spin_lock_irqsave(&q->lock, flags);

  if (list_empty(&wait->task_list))

  __add_wait_queue(q, wait);

  set_current_state(state);

  spin_unlock_irqrestore(&q->lock, flags);

}

    可以看到通过函数set_current_state()来标识了我们当前这个状态.

    perpare_to_wait()函数执行完,对当前进程设置完毕,就调用函数schedule()来进行重新调度.这个函数会重新根据调度算法产生的位图来决定哪个进程应该被得到执行.

4.实例:

  下面的实例来自国嵌:

  memdev.h:

#ifndef _MEMDEV_H_
#define _MEMDEV_H_

#ifndef MEMDEV_MAJOR
#define MEMDEV_MAJOR 0
#endif

#ifndef MEMDEV_NR_DEVS
#define MEMDEV_NR_DEVS 2
#endif

#ifndef MEMDEV_SIZE
#define MEMDEV_SIZE 4096
#endif

struct mem_dev
{
char *data;
unsigned long size;
wait_queue_head_t inq;
};
#endif

  memdev.c:

#include <linux/module.h>
#include <linux/types.h>
#include <linux/fs.h>
#include <linux/errno.h>
#include <linux/mm.h>
#include <linux/sched.h>
#include <linux/init.h>
#include <linux/cdev.h>
#include <asm/io.h>
#include <asm/system.h>
#include <asm/uaccess.h>

#include <linux/poll.h>
#include "memdev.h"

static mem_major = MEMDEV_MAJOR;
bool have_data = false;
module_param(mem_major, int, S_IRUGO);

struct mem_dev *mem_devp;

struct cdev cdev;

int mem_open(struct inode *inode, struct file *filp)
{
struct mem_dev *dev;

int num = MINOR(inode->i_rdev);

if (num >= MEMDEV_NR_DEVS)
return -ENODEV;
dev = &mem_devp[num];

filp->private_data = dev;

return 0;
}

int mem_release(struct inode *inode, struct file *filp)
{
return 0;
}

static ssize_t mem_read(struct file *filp, char __user *buf, size_t size, loff_t *ppos)
{
unsigned long p = *ppos;
unsigned int count = size;
int ret = 0;
struct mem_dev *dev = filp->private_data;

if (p >= MEMDEV_SIZE)
return 0;
if (count > MEMDEV_SIZE - p)
count = MEMDEV_SIZE - p;

while (!have_data) {
if (filp->f_flags & O_NONBLOCK)
return -EAGAIN;

wait_event_interruptible(dev->inq,have_data);
}
if (copy_to_user(buf, (void*)(dev->data + p), count))
{
ret = - EFAULT;
}
else
{
*ppos += count;
ret = count;

printk(KERN_INFO "read %d bytes(s) from %d\n", count, p);
}

have_data = false;
return ret;
}

static ssize_t mem_write(struct file *filp, const char __user *buf, size_t size, loff_t *ppos)
{
unsigned long p = *ppos;
unsigned int count = size;
int ret = 0;
struct mem_dev *dev = filp->private_data;
if (p >= MEMDEV_SIZE)
return 0;
if (count > MEMDEV_SIZE - p)
count = MEMDEV_SIZE - p;

if (copy_from_user(dev->data + p, buf, count))
ret = - EFAULT;
else
{
*ppos += count;
ret = count;

printk(KERN_INFO "written %d bytes(s) from %d\n", count, p);
}
have_data = true;

wake_up(&(dev->inq));

return ret;
}

static loff_t mem_llseek(struct file *filp, loff_t offset, int whence)
{
loff_t newpos;

switch(whence) {
case 0:
newpos = offset;
break;

case 1:
newpos = filp->f_pos + offset;
break;

case 2:
newpos = MEMDEV_SIZE -1 + offset;
break;

default:
return -EINVAL;
}
if ((newpos<0) || (newpos>MEMDEV_SIZE))
return -EINVAL;

filp->f_pos = newpos;
return newpos;

}

unsigned int mem_poll(struct file *filp, poll_table *wait)
{
struct mem_dev *dev = filp->private_data;
unsigned int mask = 0;

poll_wait(filp, &dev->inq, wait);


if (have_data) mask |= POLLIN | POLLRDNORM;

return mask;
}

static const struct file_operations mem_fops =
{
.owner = THIS_MODULE,
.llseek = mem_llseek,
.read = mem_read,
.write = mem_write,
.open = mem_open,
.release = mem_release,
.poll = mem_poll,
};

static int memdev_init(void)
{
int result;
int i;

dev_t devno = MKDEV(mem_major, 0);

if (mem_major)
result = register_chrdev_region(devno, 2, "memdev");
else
{
result = alloc_chrdev_region(&devno, 0, 2, "memdev");
mem_major = MAJOR(devno);
}

if (result < 0)
return result;
cdev_init(&cdev, &mem_fops);
cdev.owner = THIS_MODULE;
cdev.ops = &mem_fops;

cdev_add(&cdev, MKDEV(mem_major, 0), MEMDEV_NR_DEVS);

mem_devp = kmalloc(MEMDEV_NR_DEVS * sizeof(struct mem_dev), GFP_KERNEL);
if (!mem_devp)
{
result = - ENOMEM;
goto fail_malloc;
}
memset(mem_devp, 0, sizeof(struct mem_dev));
for (i=0; i < MEMDEV_NR_DEVS; i++)
{
mem_devp[i].size = MEMDEV_SIZE;
mem_devp[i].data = kmalloc(MEMDEV_SIZE, GFP_KERNEL);
memset(mem_devp[i].data, 0, MEMDEV_SIZE);

init_waitqueue_head(&(mem_devp[i].inq));
}

return 0;

fail_malloc:
unregister_chrdev_region(devno, 1);

return result;
}
static void memdev_exit(void)
{
cdev_del(&cdev);
kfree(mem_devp);
unregister_chrdev_region(MKDEV(mem_major, 0), 2); }

MODULE_AUTHOR("David Xie");
MODULE_LICENSE("GPL");

module_init(memdev_init);
module_exit(memdev_exit);