并发编程之socketserver模块

时间:2022-05-29 23:58:47

一、socketserver模块介绍

基于tcp套接字,关键的就是两个循环,一个是链接循环,一个是通信循环

socketserver模块中分两大类:srever类(解决链接问题)和request类(解决通信问题)

源码分析总结:

基于tcp的socketserver我们自己定义的类中

1.    self.server即套接字对象

2.    self.request即一个链接

3.    self.client_address即客户端地址

基于udp的socketserver我们自己定义的类中的

  1.   self.request是一个元组(第一个元素是客户端发来的数据,第二部分是服务端的udp套接字对象),如(b'adsf', <socket.socket fd=200, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 8080)>)
  2.   self.client_address即客户端地址

二、socketserver应用

并发编程之socketserver模块并发编程之socketserver模块
 1 import socketserver
2 class MyTCPhandle(socketserver.BaseRequestHandler): #必须继承socketserver.BaseRequestHandler这个类
3 def handle(self): #必须要实现一个handle方法
4 # print(self.request) #self.request相当于conn
5 # print(self.client_address) #('127.0.0.1', 51393) self.client_address相当于addr
6 while True:
7 try:
8 data = self.request.recv(1024) #IO操作
9 self.request.send(data.upper())
10 except Exception:
11 break
12 self.request.close()
13 if __name__ == '__main__':
14 server = socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyTCPhandle) #开启线程。accept方法也包括了
15 server.allow_reuse_address = True #默认为False,设为True,相当于 server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 重用地址
16 server.serve_forever() #一直开启,相当于我们以前做的链接循环
服务端
并发编程之socketserver模块并发编程之socketserver模块
 1 from socket import *
2 client = socket(AF_INET,SOCK_STREAM)
3 client.connect(('127.0.0.1',8080))
4 while True:
5 cmd = input('>>:').strip()
6 if not cmd:continue
7 client.send(cmd.encode('utf-8'))
8 data = client.recv(1024)
9 print('接受的是:%s'%data.decode('utf-8'))
10 client.close()
客户端

三、上传下载文件(这里只是上传)

并发编程之socketserver模块并发编程之socketserver模块
 1 import socket
2 import struct
3 import json
4 import subprocess
5 import os
6
7 class MYTCPServer:
8 address_family = socket.AF_INET
9 socket_type = socket.SOCK_STREAM
10 allow_reuse_address = False
11 max_packet_size = 8192
12 coding='utf-8'
13 request_queue_size = 5
14 server_dir='file_upload'
15 def __init__(self, server_address, bind_and_activate=True):
16 self.server_address=server_address
17 self.socket = socket.socket(self.address_family,
18 self.socket_type)
19 if bind_and_activate:
20 try:
21 self.server_bind()
22 self.server_activate()
23 except:
24 self.server_close()
25 raise
26
27 def server_bind(self):
28 if self.allow_reuse_address:
29 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
30 self.socket.bind(self.server_address)
31 self.server_address = self.socket.getsockname()
32
33 def server_activate(self):
34 self.socket.listen(self.request_queue_size)
35
36 def server_close(self):
37 self.socket.close()
38
39 def get_request(self):
40 return self.socket.accept()
41
42 def close_request(self, request):
43 request.close()
44
45 def run(self):
46 while True:
47 self.conn,self.client_addr=self.get_request()
48 print('from client ',self.client_addr)
49 while True:
50 try:
51 head_struct = self.conn.recv(4)
52 if not head_struct:break
53
54 head_len = struct.unpack('i', head_struct)[0]
55 head_json = self.conn.recv(head_len).decode(self.coding)
56 head_dic = json.loads(head_json)
57
58 print(head_dic)
59 #head_dic={'cmd':'put','filename':'a.txt','filesize':123123}
60 cmd=head_dic['cmd']
61 if hasattr(self,cmd):
62 func=getattr(self,cmd)
63 func(head_dic)
64 except Exception:
65 break
66
67 def put(self,args):
68 file_path=os.path.normpath(os.path.join(
69 self.server_dir,
70 args['filename']
71 ))
72
73 filesize=args['filesize']
74 recv_size=0
75 print('----->',file_path)
76 with open(file_path,'wb') as f:
77 while recv_size < filesize:
78 recv_data=self.conn.recv(self.max_packet_size)
79 f.write(recv_data)
80 recv_size+=len(recv_data)
81 print('recvsize:%s filesize:%s' %(recv_size,filesize))
82
83
84 tcpserver1=MYTCPServer(('127.0.0.1',8080))
85
86 tcpserver1.run()
服务端
并发编程之socketserver模块并发编程之socketserver模块
 1 import socket
2 import struct
3 import json
4 import os
5
6
7
8 class MYTCPClient:
9 address_family = socket.AF_INET
10 socket_type = socket.SOCK_STREAM
11 allow_reuse_address = False
12 max_packet_size = 8192
13 coding='utf-8'
14 request_queue_size = 5
15 def __init__(self, server_address, connect=True):
16 self.server_address=server_address
17 self.socket = socket.socket(self.address_family,
18 self.socket_type)
19 if connect:
20 try:
21 self.client_connect()
22 except:
23 self.client_close()
24 raise
25 def client_connect(self):
26 self.socket.connect(self.server_address)
27 def client_close(self):
28 self.socket.close()
29 def run(self):
30 while True:
31 inp=input(">>: ").strip()
32 if not inp:continue
33 l=inp.split()
34 cmd=l[0]
35 if hasattr(self,cmd):
36 func=getattr(self,cmd)
37 func(l)
38
39
40 def put(self,args):
41 cmd=args[0]
42 filename=args[1]
43 if not os.path.isfile(filename):
44 print('file:%s is not exists' %filename)
45 return
46 else:
47 filesize=os.path.getsize(filename)
48
49 head_dic={'cmd':cmd,'filename':os.path.basename(filename),'filesize':filesize}
50 print(head_dic)
51 head_json=json.dumps(head_dic)
52 head_json_bytes=bytes(head_json,encoding=self.coding)
53
54 head_struct=struct.pack('i',len(head_json_bytes))
55 self.socket.send(head_struct)
56 self.socket.send(head_json_bytes)
57 send_size=0
58 with open(filename,'rb') as f:
59 for line in f:
60 self.socket.send(line)
61 send_size+=len(line)
62 print(send_size)
63 else:
64 print('upload successful')
65 client=MYTCPClient(('127.0.0.1',8080))
66 client.run()
客户端