day 35 协程 IO多路复用

时间:2023-03-10 05:44:11
day 35 协程 IO多路复用

0、基于socket发送Http请求

import socket
import requests # 方式一
ret = requests.get('https://www.baidu.com/s?wd=alex') # 方式二
client = socket.socket() # 百度创建连接: 阻塞
client.connect(('www.baidu.com',80)) # 问百度我要什么?
client.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') # 我等着接收百度给我的回复
chunk_list = []
while True:
chunk = client.recv(8096)
if not chunk:
break
chunk_list.append(chunk) body = b''.join(chunk_list)
print(body.decode('utf-8'))

socket发送请求

# by luffycity.com

import socket
import requests
# #################### 解决并发:单线程 ####################
# 方式一
key_list = ['alex','db','sb']
for item in key_list:
ret = requests.get('https://www.baidu.com/s?wd=%s' %item) # 方式二
def get_data(key):
# 方式二
client = socket.socket() # 百度创建连接: 阻塞
client.connect(('www.baidu.com',80)) # 问百度我要什么?
client.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') # 我等着接收百度给我的回复
chunk_list = []
while True:
chunk = client.recv(8096)
if not chunk:
break
chunk_list.append(chunk) body = b''.join(chunk_list)
print(body.decode('utf-8')) key_list = ['alex','db','sb']
for item in key_list:
get_data(item) # #################### 解决并发:多线程 ####################
import threading key_list = ['alex','db','sb']
for item in key_list:
t = threading.Thread(target=get_data,args=(item,))
t.start() # #################### 解决并发:单线程+IO不等待 ####################
# IO请求?
# 数据回来了?

socket 单线程和多线程

# by luffycity.com
import socket client = socket.socket()
client.setblocking(False) # 将原来阻塞的位置变成非阻塞(报错)
# 百度创建连接: 阻塞 try:
client.connect(('www.baidu.com',80)) # 执行了但报错了
except BlockingIOError as e:
pass # 检测到已经连接成功 # 问百度我要什么?
client.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') # 我等着接收百度给我的回复
chunk_list = []
while True:
chunk = client.recv(8096) # 将原来阻塞的位置变成非阻塞(报错)
if not chunk:
break
chunk_list.append(chunk) body = b''.join(chunk_list)
print(body.decode('utf-8'))

前戏

一、IO多路复用

  1、IO多路复用作用:

    检测多个socket是否已经发生变化(是否已经连接成功/是否已经获取数据)(可读/可写)

    可以监听所有的IO请求状态

client.setblocking(False) # 将原来阻塞的位置变成非阻塞(报错)
        基于事件循环实现的异步非阻塞框架:lzl
非阻塞:不等待
异步:执行完某个人物后自动调用我给他的函数。 Python中开源 基于事件循环实现的异步非阻塞框架 Twisted

  3、单线程的并发

     基于IO多路复用+socket实现并发请求(一个线程100个请求)

        IO多路复用
socket非阻塞
    import select  #利用此模块 可以检测数据是否拿到

    rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005)
# socket_list 检测服务端是否给我返回数据
# ,conn_list 检测是否连接成功
import socket
import select client1 = socket.socket()
client1.setblocking(False) # 百度创建连接: 非阻塞 try:
client1.connect(('www.baidu.com',80))
except BlockingIOError as e:
pass client2 = socket.socket()
client2.setblocking(False) # 百度创建连接: 非阻塞
try:
client2.connect(('www.sogou.com',80))
except BlockingIOError as e:
pass client3 = socket.socket()
client3.setblocking(False) # 百度创建连接: 非阻塞
try:
client3.connect(('www.oldboyedu.com',80))
except BlockingIOError as e:
pass socket_list = [client1,client2,client3]
conn_list = [client1,client2,client3] while True:
rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005)
# wlist中表示已经连接成功的socket对象
for sk in wlist:
if sk == client1:
sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n')
elif sk==client2:
sk.sendall(b'GET /web?query=fdf HTTP/1.0\r\nhost:www.sogou.com\r\n\r\n')
else:
sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.oldboyedu.com\r\n\r\n')
conn_list.remove(sk)
for sk in rlist:
chunk_list = []
while True:
try:
chunk = sk.recv(8096)
if not chunk:
break
chunk_list.append(chunk)
except BlockingIOError as e:
break
body = b''.join(chunk_list)
# print(body.decode('utf-8'))
print('------------>',body)
sk.close()
socket_list.remove(sk)
if not socket_list:
break

单线程的并发

        基于事件循环实现的异步非阻塞框架:Twisted
非阻塞:不等待
异步:执行完某个人物后自动调用我给他的函数。 Python中开源 基于事件循环实现的异步非阻塞框架 Twisted
# by luffycity.com
import socket
import select class Req(object):
def __init__(self,sk,func):
self.sock = sk
self.func = func def fileno(self):
return self.sock.fileno() class Nb(object): def __init__(self):
self.conn_list = []
self.socket_list = [] def add(self,url,func):
client = socket.socket()
client.setblocking(False) # 非阻塞
try:
client.connect((url, 80))
except BlockingIOError as e:
pass
obj = Req(client,func)
self.conn_list.append(obj)
self.socket_list.append(obj) def run(self): while True:
rlist,wlist,elist = select.select(self.socket_list,self.conn_list,[],0.005)
# wlist中表示已经连接成功的req对象
for sk in wlist:
# 发生变换的req对象
sk.sock.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n')
self.conn_list.remove(sk)
for sk in rlist:
chunk_list = []
while True:
try:
chunk = sk.sock.recv(8096)
if not chunk:
break
chunk_list.append(chunk)
except BlockingIOError as e:
break
body = b''.join(chunk_list)
# print(body.decode('utf-8'))
sk.func(body)
sk.sock.close()
self.socket_list.remove(sk)
if not self.socket_list:
break def baidu_repsonse(body):
print('百度下载结果:',body) def sogou_repsonse(body):
print('搜狗下载结果:', body) def oldboyedu_repsonse(body):
print('老男孩下载结果:', body) t1 = Nb()
t1.add('www.baidu.com',baidu_repsonse)
t1.add('www.sogou.com',sogou_repsonse)
t1.add('www.oldboyedu.com',oldboyedu_repsonse)
t1.run() #
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
# client1 = socket.socket()
# client1.setblocking(False) # 百度创建连接: 非阻塞
#
# try:
# client1.connect(('www.baidu.com',80))
# except BlockingIOError as e:
# pass
#
#
# client2 = socket.socket()
# client2.setblocking(False) # 百度创建连接: 非阻塞
# try:
# client2.connect(('www.sogou.com',80))
# except BlockingIOError as e:
# pass
#
#
# client3 = socket.socket()
# client3.setblocking(False) # 百度创建连接: 非阻塞
# try:
# client3.connect(('www.oldboyedu.com',80))
# except BlockingIOError as e:
# pass
#
# class Foo(object):
# def __init__(self,sk):
# self.sk = sk
#
# def fileno(self):
# return self.sk.fileno()
#
# """
# 1. select.select(socket_list,conn_list,[],0.005)
# select监听的 socket_list/conn_list 内部会调用列表中每一个值的fileno方法,获取该返回值并去系统中检测。
#
# 2. 方式一:
# select.select([client1,client2,client3],[client1,client2,client3],[],0.005)
# 3. 方式二:
# select.select([Foo(client1),Foo(client2),(client3)],Foo(client1),Foo(client2),(client3),[],0.005)
# """
# socket_list = [Foo(client1),client2,client3] # client1.fileno
# conn_list = [client1,client2,client3]
#
# while True:
# rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005)
# # wlist中表示已经连接成功的socket对象
# for sk in wlist:
# if sk == client1:
# sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n')
# elif sk==client2:
# sk.sendall(b'GET /web?query=fdf HTTP/1.0\r\nhost:www.sogou.com\r\n\r\n')
# else:
# sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.oldboyedu.com\r\n\r\n')
# conn_list.remove(sk)
# for sk in rlist:
# chunk_list = []
# while True:
# try:
# chunk = sk.recv(8096)
# if not chunk:
# break
# chunk_list.append(chunk)
# except BlockingIOError as e:
# break
# body = b''.join(chunk_list)
# # print(body.decode('utf-8'))
# print('------------>',body)
# sk.close()
# socket_list.remove(sk)
# if not socket_list:
# break

单线程的并发高级版

总结:
1. socket默认是否是阻塞的?阻塞体现在哪里?
    是阻塞的, 链接的时候 accept recv 2. 如何让socket编程非阻塞?
    .setblocking(Flase) 3. IO多路复用作用?
检测多个socket是否发生变化。
操作系统检测socket是否发生变化,有三种模式:
select:最多1024个socket;循环去检测。
poll:不限制监听socket个数;循环去检测(水平触发)。
epoll:不限制监听socket个数;回调方式(边缘触发)。
Python模块:
select.select
select.epoll 4. 提高并发方案:
- 多进程
- 多线程
- 异步非阻塞模块(Twisted) scrapy框架(单线程完成并发) 5. 什么是异步非阻塞?
- 非阻塞,不等待。
比如创建socket对某个地址进行connect、获取接收数据recv时默认都会等待(连接成功或接收到数据),才执行后续操作。
如果设置setblocking(False),以上两个过程就不再等待,但是会报BlockingIOError的错误,只要捕获即可。
- 异步,通知,执行完成之后自动执行回调函数或自动执行某些操作(通知)。
比如做爬虫中向某个地址baidu.com发送请求,当请求执行完成之后自执行回调函数。 6. 什么是同步阻塞?
- 阻塞:等
- 同步:按照顺序逐步执行 key_list = ['alex','db','sb']
for item in key_list:
ret = requests.get('https://www.baidu.com/s?wd=%s' %item)
print(ret.text) 7. 概念
之前:
            # 你写的代码:7000w
v = [
[11,22], # 每个都有一个append方法
[22,33], # 每个都有一个append方法
[33,44], # 每个都有一个append方法
] # 王思聪
for item in v:
print(item.append)
        之后:
            class Foo(object):
def __init__(self,data,girl):
self.row = data
self.girl = girl def append(self,item):
self.row.append(item) v = [
Foo([11,22],'雪梨'), # 每个都有一个append方法
Foo([22,33],'冰糖'), # 每个都有一个append方法
Foo([33,44],'糖宝'), # 每个都有一个append方法
] for item in v:
print(item.append)
item.girl

二、协程

概念:
进程,操作系统中存在;
线程,操作系统中存在;
协程,是由程序员创造出来的一个不是真实存在的东西; 协程:是微线程,对一个线程进程分片,使得线程在代码块之间进行来回切换执行,而不是在原来逐行执行。
import greenlet

def f1():
print(11)
gr2.switch()
print(22)
gr2.switch() def f2():
print(33)
gr1.switch()
print(44) # 协程 gr1
gr1 = greenlet.greenlet(f1)
# 协程 gr2
gr2 = greenlet.greenlet(f2) gr1.switch() 注意:单纯的协程无用 def f1():
print(11)
print(33) def f2():
print(22)
print(44) f1()
f2()

单独协程

协程 + 遇到IO就切换 => 牛逼起来了

from gevent import monkey
monkey.patch_all() # 以后代码中遇到IO都会自动执行greenlet的switch进行切换
import requests
import gevent def get_page1(url):
ret = requests.get(url)
print(url,ret.content) def get_page2(url):
ret = requests.get(url)
print(url,ret.content) def get_page3(url):
ret = requests.get(url)
print(url,ret.content) gevent.joinall([
gevent.spawn(get_page1, 'https://www.python.org/'), # 协程1
gevent.spawn(get_page2, 'https://www.yahoo.com/'), # 协程2
gevent.spawn(get_page3, 'https://github.com/'), # 协程3
])
总结:
1. 什么是协程?
协程也可以称为“微线程”,就是开发者控制线程执行流程,控制先执行某段代码然后再切换到另外函执行代码...来回切换。 2. 协程可以提高并发吗?
协程自己本身无法实现并发(甚至性能会降低)。
协程+IO切换性能提高。 3. 进程、线程、协程的区别? 4. 单线程提供并发:
- 协程+IO切换:gevent
- 基于事件循环的异步非阻塞框架:Twisted
- 手动实现协程:yield关键字生成器
def f1():
print(11)
yield
print(22)
yield
print(33) def f2():
print(55)
yield
print(66)
yield
print(77) v1 = f1()
v2 = f2() next(v1) # v1.send(None)
next(v2) # v1.send(None)
next(v1) # v1.send(None)
next(v2) # v1.send(None)
next(v1) # v1.send(None)
next(v2) # v1.send(None)

手动实现协程:yield关键字生成器


总结
重点总结:
1. 进程、线程、协程的区别? ********** 2. 写代码:gevent *****
from gevent import monkey
monkey.patch_all() # 以后代码中遇到IO都会自动执行greenlet的switch进行切换
import requests
import gevent def get_page1(url):
ret = requests.get(url)
print(url,ret.content) def get_page2(url):
ret = requests.get(url)
print(url,ret.content) def get_page3(url):
ret = requests.get(url)
print(url,ret.content) gevent.joinall([
gevent.spawn(get_page1, 'https://www.python.org/'), # 协程1
gevent.spawn(get_page2, 'https://www.yahoo.com/'), # 协程2
gevent.spawn(get_page3, 'https://github.com/'), # 协程3
]) 3. 写代码:twisted *****
from twisted.web.client import getPage, defer
from twisted.internet import reactor def all_done(arg):
reactor.stop() def callback(contents):
print(contents) deferred_list = []
url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
for url in url_list:
deferred = getPage(bytes(url, encoding='utf8'))
deferred.addCallback(callback)
deferred_list.append(deferred) dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done) reactor.run() 4. 异步非阻塞 5. IO多路复用
作用:可以监听所有的IO请求的状态。
- socket I,input
o,output
三种模式:
- select
- poll
- epoll

两周总结:
网络编程:
1. 网络基础
- 网卡
- IP
- ...
2. OSI 7层
3. 三次握手四次挥手
4. BS和CS架构?
5. socket基本代码
6. 黏包
7. 断点续传
8. 协议
自定义协议:{'code':10001,data:{...}}
Http协议:GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n 9. 面向对象+高级作业:反射/面向对象 并发编程:
1. 进程、线程、协程的区别? 2. 线程
- 基本写法
- 实例化
- 继承
- 锁
- RLock
...
- 线程池
3. 进程
- 基本写法
- 实例化
- 继承
- 锁
- RLock
...
- 线程池
- 进程数据共享 4. 协程
- 协程
- 协程+IO:gevent 5. IO多路复用 6. 异步/同步 阻塞/非阻塞

两周总结