python—day33 基于协程实现并发的套接字通信、网络IO操作、非阻塞IO模型、IO多路复用模型、异步IO;

时间:2021-07-12 00:00:21

基于协程实现并发的套接字通信:

  server

 1 from gevent import monkey,spawn;monkey.patch_all()
 2 from threading import Thread
 3 from socket import *
 4 
 5 def talk(conn):
 6     while True:
 7         try:
 8             data=conn.recv(1024)
 9             if not data:break
10             conn.send(data.upper())
11         except ConnectionResetError:
12             break
13     conn.close()
14 def server(ip,port,backlog=5):
15     s = socket()
16     s.bind((ip,port))
17     s.listen(backlog)
18     while True:
19         conn, addr = s.accept()
20         print(addr)
21         # 通信
22         g=spawn(talk,conn)
23     s.close()
24 
25 if __name__ == '__main__':
26     spawn(server,'127.0.0.1',8080).join()
27     # server(('127.0.0.1',8080))

  client

 1 from threading import Thread,current_thread
 2 from socket import *
 3 def client():
 4     client = socket()
 5     client.connect(('127.0.0.1', 8080))
 6     while True:
 7         data = '%s hello' % current_thread().name
 8         client.send(data.encode('utf-8'))
 9         res = client.recv(1024)
10         print(res.decode('utf-8'))
11 if __name__ == '__main__':
12     for i in range(1000):
13         t=Thread(target=client)
14         t.start()

 

网络IO操作:

  server

 1 from socket import *
 2 
 3 s = socket()
 4 s.bind(('127.0.0.1',8080))
 5 s.listen(5)
 6 
 7 while True:
 8     conn, addr = s.accept()
 9     print(addr)
10     while True:
11         try:
12             data = conn.recv(1024)
13             if not data: break
14             print('from client msg: ',data)
15         except ConnectionResetError:
16             break
17     conn.close()

 

  client

 1 from socket import *
 2 
 3 client = socket()
 4 client.connect(('127.0.0.1', 8080))
 5 
 6 while True:
 7 
 8     data = input('>>: ').strip()
 9     if not data:continue
10     client.send(data.encode('utf-8'))
11     print('has send')

 

 

阻塞IO模型:blockingIOError!

python—day33 基于协程实现并发的套接字通信、网络IO操作、非阻塞IO模型、IO多路复用模型、异步IO;

 

阻塞IO模型:NonblockingIOError!

python—day33 基于协程实现并发的套接字通信、网络IO操作、非阻塞IO模型、IO多路复用模型、异步IO;

 

  server

 1 from socket import *
 2 import time
 3 
 4 s = socket()
 5 s.bind(('127.0.0.1',8080))
 6 s.listen(5)
 7 s.setblocking(False)
 8 r_list = []
 9 w_list = []
10 while True:
11     try:
12         conn,addr = s.accept()
13         r_list.append(conn)
14 
15     except BlockingIOError:
16         # time.sleep(0.5)
17         print('干其他活了!')
18 
19         print('rlist:',len(r_list))
20 
21         #收消息
22         del_rlist = []
23         for conn in r_list:
24             try:
25                 data = conn.recv(1024)
26 
27                 #linux 系统
28                 if not data:
29                     conn.close()
30                     del_rlist.append(conn)
31                     continue
32 
33                 # conn.send(data.upper())
34                 # w_list.append((conn,data.upper()))
35             except BlockingIOError:
36                 continue
37 
38             except ConnectionResetError:
39                 conn.close()
40                 del_rlist.append(conn)
41 
42 
43         #发消息
44         del_wlist = []
45         for item in w_list:
46             try:
47                 conn= item[0]
48                 res= item[1]
49                 conn.send(res)
50                 del_wlist.append(item)
51             except BlockingIOError:
52                 continue
53             except ConnectionResetError:
54                 conn.close()
55                 del_wlist.append(conn)
56 
57         for conn in del_rlist:
58             r_list.remove(conn)
59 
60         for item in del_wlist:
61             w_list.remove(item)

 

  client

 1 from socket import *
 2 import os
 3 from threading import Thread
 4 
 5 client = socket()
 6 client.connect(('127.0.0.1',8080))
 7 
 8 while True:
 9    data = '%s say hello' %os.getpid()
10    client.send(data.encode('utf-8'))
11    res = client.recv(1024)
12    print(res.decode('utf-8'))

 

异步IO:

  

 1 from concurrent.futures import ThreadPoolExecutor
 2 from threading import current_thread
 3 import time
 4 import os
 5 
 6 def task(n):
 7     print('%s is running' %current_thread().name)
 8     time.sleep(2)
 9     return n**2
10 
11 def parse(obj):
12     res=obj.result()
13     print(res)
14 
15 if __name__ == '__main__':
16     t=ThreadPoolExecutor(4)
17 
18     future1=t.submit(task,1)
19     future1.add_done_callback(parse) #parse函数会在future1对应的任务执行完毕后自动执行,会把future1自动传给parse
20 
21     future2=t.submit(task,2)
22     future2.add_done_callback(parse)
23 
24     future3=t.submit(task,3)
25     future3.add_done_callback(parse)
26 
27     future4=t.submit(task,4)
28     future4.add_done_callback(parse)

 

 

IO多路复用模型:multiplexing

python—day33 基于协程实现并发的套接字通信、网络IO操作、非阻塞IO模型、IO多路复用模型、异步IO;

 

  server

 

 1 from socket import *
 2 import select
 3 
 4 s = socket()
 5 s.bind(('127.0.0.1',8080))
 6 s.listen(5)
 7 s.setblocking(False)
 8 # print(s)
 9 
10 r_list=[s,]
11 w_list=[]
12 w_data={}
13 while True:
14     print('被检测r_list: ',len(r_list))
15     print('被检测w_list: ',len(w_list))
16     rl,wl,xl=select.select(r_list,w_list,[],) #r_list=[server,conn]
17 
18     # print('rl: ',len(rl)) #rl=[conn,]
19     # print('wl: ',len(wl))
20 
21     # 收消息
22     for r in rl: #r=conn
23         if r == s:
24             conn,addr=r.accept()
25             r_list.append(conn)
26         else:
27             try:
28                 data=r.recv(1024)
29                 if not data:
30                     r.close()
31                     r_list.remove(r)
32                     continue
33                 # r.send(data.upper())
34                 w_list.append(r)
35                 w_data[r]=data.upper()
36             except ConnectionResetError:
37                 r.close()
38                 r_list.remove(r)
39                 continue
40 
41     # 发消息
42     for w in wl:
43         w.send(w_data[w])
44         w_list.remove(w)
45         w_data.pop(w)

 

 

  client

 1 from socket import *
 2 import os
 3 
 4 client = socket()
 5 client.connect(('127.0.0.1', 8080))
 6 
 7 while True:
 8     data='%s say hello' %os.getpid()
 9     client.send(data.encode('utf-8'))
10     res=client.recv(1024)
11     print(res.decode('utf-8'))