IO 模型 IO 多路复用

时间:2021-08-02 07:10:59

IO 模型 IO 多路复用

IO多路复用:模型(解决问题的方案)

同步:一个任务提交以后,等待任务执行结束,才能继续下一个任务

异步:不需要等待任务执行结束,

阻塞:IO阻塞,程序卡住了

非阻塞:不阻塞

IO模型:

  1. 阻塞 IO
  2. 非阻塞 IO
  3. IO 多路复用
  4. 异步 IO

阻塞 IO :

服务端:

import socket
import time server = socket.socket()
# 允许地址复用
server.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("127.0.0.1", 8001)) server.listen(5)
# 设置非阻塞
server.setblocking(False) while 1:
time.sleep(0.2)
try:
# 等待 用户链接
conn, addr = server.accept()
print(addr) except BlockingIOError:
print('在做其他的事情') # print('r_list: ', len(r_list))
# print('w_dict: ', len(w_dict))

客户端:

import socket
import time client = socket.socket() client.connect(("127.0.0.1", 8001)) while 1:
data = input(">>>>>>")
if not data:
continue
client.send(data.encode("utf-8")) print(client.recv(1024).decode("utf-8"))

非阻塞 IO:

  • 将 server 端设为 IP 地址复用 监听个数增加

  • 设为非阻塞

  • 循环 等待用户连接, 将连接的用户放入列表 未连接 会报错 处理未连接的 异常

  • 循环接收信息 处理未接收的信息 异常 强制断开异常

    强调强调强调:!!!非阻塞IO的精髓在于完全没有阻塞!!!

服务端:

import socket
import time # 在非阻塞式IO中, 用户进程其实是需要不断的主动询问kernel数据准备好了没有。 server = socket.socket()
# 允许地址复用
server.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("127.0.0.1", 8001)) server.listen(5)
# 设置非阻塞
server.setblocking(False)
# 存放 所有已经链接的 用户 通道
r_list = []
# 存储所有 用户发送来的 消息
w_dict = {}
while 1:
time.sleep(0.2)
try:
# 等待 用户链接
conn, addr = server.accept()
# 将 已经链接的 用户放入 用户列表
r_list.append(conn)
except BlockingIOError:
pass
# 非阻塞模式下 如果没有用户链接会报错
# print(111) # print('在做其他的事情')
# print('r_list: ', len(r_list))
# print('w_dict: ', len(w_dict)) # 存放 错误的用户信息
del_list = []
for con in r_list:
try:
# 接收消息
data = con.recv(1024)
# 如果接收到 空信息 则证明 用户已经断开连接
if not data:
# 关闭 这个链接
con.close()
del_list.append(con)
continue
else:
# 将接收到的 信息 保存到列表中
w_dict[con] = data.decode("utf-8")
except BlockingIOError: # 没有收成功,则继续检索下一个套接字的接收
pass
except ConnectionResetError: # 当前套接字出异常,则关闭,然后加入删除列表,等待被清除
con.close() # 强制断开连接
del_list.append(con)
pass
# 删除 错误 链接
# print(r_list)
# 遍历写列表,依次取出套接字发送内容
del_wlist=[]
for con in del_list:
r_list.remove(con)
# print(w_dict)
# 打印所有信息 并回复
for k,v in w_dict.items():
print("用户:%s\n>>>>>>%s" % (k,v))
k.send("你还好吗?".encode("utf-8"))
# 清空信息字典
#del_rlist.clear() #清空列表中保存的已经删除的内容
for conn in del_wlist:
w_list.pop(conn)
# w_dict.clear()

客户端:

import socket
import time # 在非阻塞式IO中, 用户进程其实是需要不断的主动询问kernel数据准备好了没有。 client = socket.socket() client.connect(("127.0.0.1", 8001)) while 1:
data = input(">>>>>>")
if not data:
continue
client.send(data.encode("utf-8")) print(client.recv(1024).decode("utf-8"))

IO 多路复用 :

  • select : 代理监听所有的对象 轮训监听列表

  • 机制:

    select 机制: windows linux 自动切换 最多监听 32位机默认是1024个。64位机默认是2048.

    • 时间复杂度O(n)

    poll 机制: Linux 它没有最大连接数的限制,原因是它是基于链表来存储的

    • 时间复杂度O(n)

    epoll 机制: Linux

    • 时间复杂度O(1)

    • epoll的优点:

      1、没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口)

      2、效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。只有活跃可用的FD才会调用callback函数;

      即Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll

客户端:

import select
import socket server = socket.socket() server.bind(("127.0.0.1", 8001)) server.listen(5)
# 设置非阻塞 不等待任何IO 操作
server.setblocking(False) # 所有 监听的 列表
r_list = [server, ]
# 存放客户端发送过来的消息
r_data = {}
# 所有 有动作的 管道 列表
w_list = []
# 存放要返回给客户端的消息
w_data = {} while 1:
# 开始 select 监听,对r_list中的服务端server进行监听,select函数阻塞进程,直到r_list中的套接字被触发
# (在此例中,套接字接收到客户端发来的握手信号,从而变得可读,满足select函数的“可读”条件)
# ,被触发的(有动静的)套接字(服务器套接字)返回给了rl这个返回值里面;
rl, wl, al = select.select(r_list, w_list, [])
# 对rl进行循环判断是否有客户端连接进来,当有客户端连接进来时select将触发
for con in rl:
# 判断当前触发的是不是socket对象, 当触发的对象是socket对象时,说明有新客户端accept连接进来了
if con == server:
conn, addr = server.accept()
# 把新的客户端连接加入到监听列表中,当客户端的连接有接收消息的时候,select将被触发,会知道这个连接有动静,有消息,那么返回给rl这个返回值列表里面。
r_list.append(conn)
else:
try:
# 是客户端连接对象触发 接收消息
data = con.recv(1024)
# 没有数据的时候,我们将这个连接关闭掉,并从监听列表中移除
if not data:
con.close()
r_list.remove(con)
else:
# 存放 客户端发送过来的消息
r_data[con] = data.decode("utf-8")
# 将客户端连接对象和这个对象接收到的消息加工成返回消息,并添加到wdata这个字典里面
w_data[con] = data.upper()
# 需要给这个客户端回复消息的时候,我们将这个连接添加到wlist写监听列表中
w_list.append(con)
# 如果这个连接出错了,客户端暴力断开了(注意,我还没有接收他的消息,或者接收他的消息的过程中出错了)
except ConnectionResetError as E:
print(E)
# 关闭连接
con.close()
# 将这个链接从监听列表中删除
r_list.remove(con)
# 如果现在没有客户端请求连接,也没有客户端发送消息时,开始对发送消息列表进行处理,是否需要发送消息
for data in wl:
data.send(w_data[data])
w_data.pop(data)
w_list.remove(data) # 打印所有接收到的数据
for k, v in r_data.items():
print("用户:%s\n%s" % (k, v))
# 清空信息字典
r_data.clear()

客户端:


import socket client = socket.socket() server_ip = ("127.0.0.1", 8001)
# 链接指定的 服务端
client.connect(server_ip)
while 1:
client.send(input(">>>>>").encode("utf-8")) data = client.recv(1024).decode("utf-8")
print(data)