以python理解Linux的IO多路复用,select、poll、epoll

时间:2023-03-09 20:05:52
以python理解Linux的IO多路复用,select、poll、epoll

题外话

之前在看Unix环境高级编程的时候,看完高级IO那一章,感觉自己萌萌哒,0.0 ,有点囫囵吞枣的感觉,之后翻了几篇博客,从纯系统的角度理解,稍微有了点概念,以这两篇为例,可以以后参考:

http://www.cnblogs.com/Anker/p/3265058.html

https://segmentfault.com/a/1190000003063859%20

不过还是理解个大概,最近通过阅读python的select模块,以及翻了几篇关于python实现异步IO的例子的博客,对它们有点豁然开朗的感觉,再重新翻看高级IO一章,突然觉得有点意思。

正题:由python引入

这里参考了这篇博客的内容:   http://www.cnblogs.com/coser/archive/2012/01/06/2315216.html

1、白话select、poll、epoll的区别

select

select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。

select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。

另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。

poll

poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

epoll

直到Linux2.6(2003年发布)才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

select实例 

在python中,select函数是一个对底层操作系统的直接访问的接口,属于c的原生接口函数。它用来监控sockets、files和pipes,等待IO完成(Waiting for I/O completion)。当有可读、可写或是异常事件产生时,select可以很容易的监控到。python的select模块定义的select方法如下:

def select(rlist, wlist, xlist, timeout=None): # real signature unknown; restored from __doc__
pass

select.select(rlist, wlist, xlist[, timeout]) 传递三个参数,一个为输入而观察的文件对象列表,一个为输出而观察的文件对象列表和一个观察错误异常的文件列表。第四个是一个可选参数,表示超时秒数。其返回3个tuple,每个tuple都是一个准备好的对象列表,它和前边的参数是一样的顺序。下面还是用别人的代码理解吧,自己写的太烂。。。。。

server端


# -*- coding:utf-8 -*-

import select

import socket

import Queue

# create a socket

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

server.setblocking(False)   #False is equivalent to settimeout(0.0).   True ,is equivalent to settimeout(None)

# set option reused

server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)     # SOL_SOCKET = 65535    SO_REUSEADDR = 4
# 相当于closesocket(一般不会立即关闭而经历TIME_WAIT的过程)后想继续重用该socket
server_address = ('192.168.0.104', 10001) server.bind(server_address) server.listen(10) # sockets from which we except to read inputs = [server] # sockets from which we expect to write outputs = [] # Outgoing message queues (socket:Queue) message_queues = {} # A optional parameter for select is TIMEOUT timeout = 20 while inputs: print "waiting for next event" readable, writable, exceptional = select.select(inputs, outputs, inputs, timeout) # When timeout reached , select return three empty lists if not (readable or writable or exceptional):
print "Time out ! "
break for s in readable: if s is server: # A "readable" socket is ready to accept a connection connection, client_address = s.accept() print " connection from ", client_address connection.setblocking(0) inputs.append(connection) message_queues[connection] = Queue.Queue() else: data = s.recv(1024) if data: print " received ", data, "from ", s.getpeername() message_queues[s].put(data) # Add output channel for response if s not in outputs:
outputs.append(s) else: # Interpret empty result as closed connection print " closing", client_address if s in outputs:
outputs.remove(s) inputs.remove(s) s.close() # remove message queue del message_queues[s] for s in writable: try: next_msg = message_queues[s].get_nowait() except Queue.Empty: print " ", s.getpeername(), 'queue empty' outputs.remove(s) else: print " sending ", next_msg, " to ", s.getpeername() s.send(next_msg) for s in exceptional: print " exception condition on ", s.getpeername() # stop listening for input on the connection inputs.remove(s) if s in outputs:
outputs.remove(s) s.close() # Remove message queue del message_queues[s]

 

client端

# -*- coding:utf-8 -*-

import socket
import time messages = ["This is the message", "It will be sent", "in parts "] print "Connect to the server" server_address = ("192.168.0.104", 10001) # Create a TCP/IP sock socks = [] for i in range(10):
socks.append(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) #创建10个socketTCP连接,加入socks列表 for s in socks:
s.connect(server_address) #与服务器建立连接 counter = 0 for message in messages: # Sending message from different sockets for s in socks:
counter += 1 #两层循环,完成对每条消息通过十个建立好的socket发送给服务器 print " %s sending %s" % (s.getpeername(), message + " version " + str(counter)) s.send(message + " version " + str(counter)) time.sleep(0.5) #为了看清楚循环的数据交换过程,加的延时
# Read responses on both sockets for s in socks: data = s.recv(1024) print " %s received %s" % (s.getpeername(), data) if not data:
print "closing socket ", s.getpeername() s.close()
#socket._socketobject.close()

使用poll方式完成Server端,这里只能在Linux上运行

# -*- coding:utf-8 -*-

import socket

import select

import Queue

# Create a TCP/IP socket, and then bind and listen

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

server.setblocking(False)

server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

server_address = ("192.168.0.104", 10001)

print  "Starting up on %s port %s" % server_address

server.bind(server_address)

server.listen(5)

message_queues = {}

# The timeout value is represented in milliseconds, instead of seconds.

timeout = 1000

# Create a limit for the event

READ_ONLY = (select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)

READ_WRITE = (READ_ONLY | select.POLLOUT)

# Set up the poller

poller = select.poll()

poller.register(server, READ_ONLY)

# Map file descriptors to socket objects

fd_to_socket = {server.fileno(): server,}

while True:

    print "Waiting for the next event"

    events = poller.poll(timeout)

    print "*" * 20

    print len(events)

    print events

    print "*" * 20

    for fd, flag in events:

        s = fd_to_socket[fd]

        if flag & (select.POLLIN | select.POLLPRI):

            if s is server:

                # A readable socket is ready to accept a connection

                connection, client_address = s.accept()

                print " Connection ", client_address

                connection.setblocking(False)

                fd_to_socket[connection.fileno()] = connection

                poller.register(connection, READ_ONLY)

                # Give the connection a queue to send data

                message_queues[connection] = Queue.Queue()

            else:

                data = s.recv(1024)

                if data:

                    # A readable client socket has data

                    print "  received %s from %s " % (data, s.getpeername())

                    message_queues[s].put(data)

                    poller.modify(s, READ_WRITE)

                else:

                    # Close the connection

                    print "  closing", s.getpeername()

                    # Stop listening for input on the connection

                    poller.unregister(s)

                    s.close()

                    del message_queues[s]

        elif flag & select.POLLHUP:

            # A client that "hang up" , to be closed.

            print " Closing ", s.getpeername(), "(HUP)"

            poller.unregister(s)

            s.close()

        elif flag & select.POLLOUT:

            # Socket is ready to send data , if there is any to send

            try:

                next_msg = message_queues[s].get_nowait()

            except Queue.Empty:

                # No messages waiting so stop checking

                print s.getpeername(), " queue empty"

                poller.modify(s, READ_ONLY)

            else:

                print " sending %s to %s" % (next_msg, s.getpeername())

                s.send(next_msg)

        elif flag & select.POLLERR:

            # Any events with POLLERR cause the server to close the socket

            print "  exception on", s.getpeername()

            poller.unregister(s)

            s.close()

            del message_queues[s]

使用epoll的方式

在Linux环境中,把上面poll实现的方式直接换为select.epoll()也可以运行,这时,不再是轮询的方式,timeout不再起作用,server端会直到监听到客户端发来数据,才有响应。

2、从系统的角度来看:

下面来看select、poll、epoll的接口原型:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

select的第一个参数nfds为fdset集合中最大描述符值加1,fdset是一个位数组,其大小限制为__FD_SETSIZE(1024),select的第二三四个参数表示需要关注读、写、错误事件的文件描述符位数组,这些参数既是输入参数也是输出参数,可能会被内核修改用于标示哪些描述符上发生了关注的事件。所以每次调用select前都需要重新初始化fdset。select对应于内核中的sys_select调用,sys_select首先将第二三四个参数指向的fd_set拷贝到内核,然后对每个被SET的描述符调用进行poll,并记录在临时结果中(fdset),如果有事件发生,select会将临时结果写到用户空间并返回;当轮询一遍后没有任何事件发生时,如果指定了超时时间,则select会睡眠到超时,睡眠结束后再进行一次轮询,并将临时结果写到用户空间,然后返回。select返回后,需要逐一检查关注的描述符是否被SET(事件是否发生)。

poll与select不同,通过一个pollfd数组向内核传递需要关注的事件,故没有描述符个数的限制,pollfd中的events字段和revents分别用于标示关注的事件和发生的事件,故pollfd数组只需要被初始化一次。poll的实现机制与select类似,其对应内核中的sys_poll,只不过poll向内核传递pollfd数组,然后对pollfd中的每个描述符进行poll,相比处理fdset来说,poll效率更高。

epoll通过epoll_create创建一个用于epoll轮询的描述符,通过epoll_ctl添加/修改/删除事件,通过epoll_wait检查事件,epoll不是通过轮询,而是通过在等待的描述符上注册回调函数,当事件发生时,回调函数负责把发生的事件存储在就绪事件链表中,最后写到用户空间。epoll返回后,该参数指向的缓冲区中即为发生的事件,即epoll返回时已经明确的知道哪个fd发生了事件,不用再一个个比对。这样就提高了效率。同时select的FD_SETSIZE是有限止的,而epoll是没有限止的只与系统资源有关。epoll不会随着监听fd数目的增长而降低效率,因为select采用轮询方式,轮询的fd数目越多,自然耗时越多,而epoll是触发式的,所以效率高。

下面重点看一下epoll的接口:

根据man手册和这篇博客http://blog.****.net/yangwen123/article/details/14119195的描述,epoll仅仅是一个异步事件的通知机制,其本身并不作任何的IO读写操作,它只负责告诉你是不是可以读或可以写了,而具体的读写操作,还要应用程序自己来完成。

epoll通过创建接口epoll_create,返回一个用于epoll轮询的文件描述符,然后通过epoll_ctl,注册一个感兴趣的文件描述符,并把它加入到一个epoll文件描述符的集合中,最后,实际上是通过epoll_wait来检查事件的发生。

--------------------->    int epoll_create(int size)

该函数生成一个epoll专用的文件描述符。它其实是在内核申请一空间,用来存放你想关注的fd上是否发生的事件。size就是你在这个epoll fd上能关注的最大fd数,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

--------------------->    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

该函数用于控制某个epoll文件描述符上的事件,可以注册事件,修改事件,删除事件。 
epfd:由 epoll_create生成的epoll专用的文件描述符; 
op:要进行的操作例如注册事件,可能的取值:
1)EPOLL_CTL_ADD 注册新的fd到epfd中;
2)EPOLL_CTL_MOD修改已经注册的fd的监听事件;
3)EPOLL_CTL_DEL 从epfd中删除一个fd;
fd:需要监听的fd 
event:指向epoll_event的指针,告诉内核需要监听的事件,常用的事件类型: 
1)EPOLLIN :表示对应的文件描述符可以读;
2)EPOLLOUT:表示对应的文件描述符可以写;
3)EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
4)EPOLLERR:表示对应的文件描述符发生错误;
5)EPOLLHUP:表示对应的文件描述符被挂断;
6)EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
7)EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。
如果调用成功返回0,不成功返回-1

--------------------->  int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)

该函数用于轮询I/O事件的发生,如果发生则将发生的fd和事件类型放入到events数组中。 并且将注册在epfd上的fd的事件类型给清空,所以如果下一个循环你还要关注这个fd的话,则需要用epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)来重新设置fd的事件类型。这时不用EPOLL_CTL_ADD,因为fd并未清空,只是事件类型清空。
epfd:由epoll_create生成的epoll专用的文件描述符; 
epoll_event:用于回传待处理事件的数组; 
maxevents:每次能处理的事件数; 
timeout:等待I/O事件发生的超时值,为0的时候表示马上返回,为-1的时候表示一直等下去,直到有事件返回。