Select\Poll\Epoll异步IO与事件驱动

时间:2023-03-09 16:35:16
Select\Poll\Epoll异步IO与事件驱动

事件驱动与异步IO

事件驱动编程是一种编程规范,这里程序的执行流由外部事件来规定。它的特点是包含一个事件循环,但外部事件发生时使用回调机制来触发响应的处理。另外两种常见的编程规范是(单线程)同步以及多线程编程。

 

 

在单线程同步模型中,任务按照顺序执行。如果某个任务因为IO而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断出来的。如果任务之间并没有相互依赖的关系,但仍然需要相互等待的话这就使得程序不必要的降低了运行速度。

 

在多线程版本中,2个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。与完成类似功能的同步程序相比,这种方式更有效率,但程序必须写代码来保护共享资源,防止其他被多线程同时访问。多线程程序更加难以推断,因为这类程序不得不通过线程同步机制加锁,可重入函数,线程局部存储或其他机制来处理线程安全问题。

 

在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理IO或者其他昂贵的操作时,注册一个回调到事件循环中,然后当IO操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件来到时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关系线程安全问题。

 

事件驱动模型适用的环境:

  • 程序中有许多任务,而且任务之间不存在因果联系
  • 任务之间高度独立(因为它们不需要相互通信,或着等待彼此)
  • 在等待事件到来时,某些任务会阻塞

 

 

Select \ Poll \ Epoll 异步IO

三者的区别:

select

select最早于1983年出现在4.2BSD中,它通过一个select() 系统调用来监控多个文件描述符的数组,当select() 返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在来看,这也是它所剩不多的优点之一。

select的缺点在于单个进程能够监视的文件描述符的数量存在有最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。

另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定开销。

 

poll

poll在1986年诞生于System V Release 3,它和select在本质上没有太大差别,但是poll没有最大文件描述符数量的限制。

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和poll() 将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发。

 

epoll

直到Linux2.6才出现了由内核直接支持的实现方式,就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux 2.6下性能最好的多路IO就绪通知方式。

epoll可以同时支持水平触发和边缘触发(只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果没有采取行动,那么它就不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要高一些,但是代码实现相当复杂。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式,在select、poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类是callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

 

 

 

python select

python的select() 方法直接调用操作系统的IO接口,它监控sockets,open files,and pipes(所有带fileno()方法的文件句柄)何时变成 readable 和 writeable 或者通信错误,select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过python的解释器。

下面通过echo server例子了解select是如何通过单进程实现同时处理多个非阻塞的scoket连接的

import select
import socket
import sys
import Queue # Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0) # Bind the socket to the port
server_address = ('localhost', 10000)
print >>sys.stderr, 'starting up on %s port %s' % server_address
server.bind(server_address) # Listen for incoming connections
server.listen(5)

slect()方法接收并监控3个通信列表,第一个是所有的输入的data,就是指外部发过来的数据,第2个是监控和接收所有要法术的data(outgoing data),第3个监控错误信息,接下来需要创建2个列表来包含输入和输出信息来传递给select()

# Sockets from which we expect to read
inputs = [ server ] # Sockets to which we expect to write
outputs = [ ]

所有客户端的进来的连接和数据将会被server的主循环程序放在上面的list中处理,我们现在的server端需要等待连接可写(writable)之后才能过来,然后接收数据并返回(因此不是在接收到数据之后立刻返回),因为每个连接要把输入或输出的数据先缓存到queue里,然后再由select取出来再发出去。

# Outgoing message queues (socket:Queue)
message_queues = {}

下面是此程序的主循环,调用select()时会阻塞和等待直到新的连接和数据进来

while inputs:

    # Wait for at least one of the sockets to be ready for processing
print >>sys.stderr, '\nwaiting for the next event'
readable, writable, exceptional = select.select(inputs, outputs, inputs)

当你把 inputs,outputs,exceptional(这里跟inputs共用)传给select()后,它返回3个新的list,我们上面将它们分别赋值为 readable , writable ,execptional,所有在readable list中的socket连接代表有数据可接收(recv),所有在 writable list中的存放着可以对其进行发送(send)操作的socket 连接,当连接通信出现error时会把error写到 exceptional 列表中。

 

readable list中的socket可以有3种可能状态

第一种是如果这个socket是 main “server” socket,它负责监听客户端的连接,如果 main server socket 出现在readable里,代表这个server端已经ready来接收一个新的连接进来了,为了让这个 main server 能同时处理多个连接,在下面的代码里,我们把这个main server 的socket设置为非阻塞模式。

# Handle inputs
for s in readable: if s is server:
# A "readable" server socket is ready to accept a connection
connection, client_address = s.accept()
print >>sys.stderr, 'new connection from', client_address
connection.setblocking(0)
inputs.append(connection) # Give the connection a queue for data we want to send
message_queues[connection] = Queue.Queue()

第二中情况是这个socket是已经建立的连接,它把数据发了过来,这个时候你就可以通过recv() 来接收它发过来的数据,然后把接收到的数据放到queue里,这样就可以把接收到的数据再传回给客户端了。

else:
data = s.recv(1024)
if data:
# A readable client socket has data
print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
message_queues[s].put(data)
# Add output channel for response
if s not in outputs:
outputs.append(s)

第三种情况就是这个客户端已经断开了,所以再通过recv() 接收到的数据就是为空了,所以这个时候就可以把这个跟客户端的连接关闭了。

 

else:
# Interpret empty result as closed connection
print >>sys.stderr, 'closing', client_address, 'after reading no data'
# Stop listening for input on the connection
if s in outputs:
outputs.remove(s) #既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉
inputs.remove(s) #inputs中也删除掉
s.close() #把这个连接关闭掉 # Remove message queue
del message_queues[s]

对于writable list中的socket,也有几种状态,如果这个客户端连接在跟它对应的queue里面的数据,就把这个数据取出来再发回给这个客户端,是否就把这个连接从 output list 中移除,这样下一次循环select()调用时检测到 outputs list中没有这个连接,那就会认为这个连接还处于非活动状态。

# Handle outputs
for s in writable:
try:
next_msg = message_queues[s].get_nowait()
except Queue.Empty:
# No messages waiting so stop checking for writability.
print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty'
outputs.remove(s)
else:
print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
s.send(next_msg)

组后,如果在跟某个socket连接通信过程中出现了错误,就把这个连接对象在 inputs、outputs、message_queue中都删除,再把连接关闭掉

# Handle "exceptional conditions"
for s in exceptional:
print >>sys.stderr, 'handling exceptional condition for', s.getpeername()
# Stop listening for input on the connection
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close() # Remove message queue
del message_queues[s]

 

 

最后服务器端的完整代码:

import select
import socket
import sys
import queue # Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False) # Bind the socket to the port
server_address = ('localhost', 10000)
print(sys.stderr, 'starting up on %s port %s' % server_address)
server.bind(server_address) # Listen for incoming connections
server.listen(5) # Sockets from which we expect to read
inputs = [ server ] # Sockets to which we expect to write
outputs = [ ] message_queues = {}
while inputs: # Wait for at least one of the sockets to be ready for processing
print( '\nwaiting for the next event')
readable, writable, exceptional = select.select(inputs, outputs, inputs)
# Handle inputs
for s in readable: if s is server:
# A "readable" server socket is ready to accept a connection
connection, client_address = s.accept()
print('new connection from', client_address)
connection.setblocking(False)
inputs.append(connection) # Give the connection a queue for data we want to send
message_queues[connection] = queue.Queue()
else:
data = s.recv(1024)
if data:
# A readable client socket has data
print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
message_queues[s].put(data)
# Add output channel for response
if s not in outputs:
outputs.append(s)
else:
# Interpret empty result as closed connection
print('closing', client_address, 'after reading no data')
# Stop listening for input on the connection
if s in outputs:
outputs.remove(s) #既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉
inputs.remove(s) #inputs中也删除掉
s.close() #把这个连接关闭掉 # Remove message queue
del message_queues[s]
# Handle outputs
for s in writable:
try:
next_msg = message_queues[s].get_nowait()
except queue.Empty:
# No messages waiting so stop checking for writability.
print('output queue for', s.getpeername(), 'is empty')
outputs.remove(s)
else:
print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
s.send(next_msg)
# Handle "exceptional conditions"
for s in exceptional:
print('handling exceptional condition for', s.getpeername() )
# Stop listening for input on the connection
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close() # Remove message queue
del message_queues[s]

 

客户端完整代码:

import socket
import sys messages = [ 'This is the message. ',
'It will be sent ',
'in parts.',
]
server_address = ('localhost', 10000) # Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
] # Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
s.connect(server_address) for message in messages: # Send messages on both sockets
for s in socks:
print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
s.send(message) # Read responses on both sockets
for s in socks:
data = s.recv(1024)
print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
if not data:
print >>sys.stderr, 'closing socket', s.getsockname()
s.close()

相关文章