今日所学
一.ftp上传简单实例
二.socketsever的固定用法
三.验证合法性连接
1.ftp上传实例
这个题目是我们现在网络编程比较基础一点的题目
下面我们只写简单上传的代码
上传服务端的代码
import socket
import struct
import json
sever=socket.socket()
ip_port=('127.0.0.1',8008)
sever.bind(ip_port)
sever.listen()
conn,addr=sever.accept()
#先接受4个字节,为文件的信息的长度
struct_len_info=conn.recv(4) # 拿到字典的长度,(拿到的结果为元组,从元组中把数据拿出来)
data_len=struct.unpack('i',struct_len_info)[0] # 然后根据拿到的长度,接收字典的字节类型
data_dict_bytes=conn.recv(data_len) # 将字节转换为json字符串类型
data_json=data_dict_bytes.decode('utf-8') # 将json字符串转换为字典
data_dict=json.loads(data_json) sum=0
#算出文件的路径
file_path='相对路径'+'\\'+data_dict['file_name']
with open(file_path,mode='wb') as f:
while sum<data_dict['file_size']: from_client_msg=conn.recv(1024) sum+=len(from_client_msg) f.write(from_client_msg)
上传客户端代码
import socket
import os
import struct
import json
client=socket.socket()
cleint.connect(('127.0.0.1',8008))
#读取文件的大小(读出来的内容是文件的字节类型)
file_size=os.path.getsize(r'相对路径')
# 在发送真实的文件之前,先将文件的信息包装成字典发送给服务端(文件名字,文件大小,文件路径)
file_info={
'file_name':'XXX.XXX',
'file_size' : file_size,
'file_LuJing': '文件的路径'
}
#将字典序列化成json字符串
file_info_str=json.dumps(file_info)
#将json字符串编码成字节
file_info_byte=file_info_str.encode('utf-8')
#求出字典的字节长度
info_len=len(file_info_byte)
# 打包
info_struct=struct.pack('i',info_len)
# 将打包好的信息和长度发送给服务端
client.send(info_struct+file_info_byte)
with open('你要发送的文件.xxx',mode='rb') as f:
#如果你累加的长度小于文件的长度,让他继续执行
while sum<file_size:
#每次读取的大小最大为1024
every_read=f.read(1024)
sum+=len(every_read)
#将读取到的数据发送给服务端
client.send(every_read)
注 : 上传和下载只是角色互换而已,上传为客户端向服务端中保存数据,而下载是客户端从服务端的内容中下载相应的文件.
2.socketsever的固定用法
话不多说,上代码
先上传服务端的代码
import socketsever
class 类名(socketsever.BaseRequestHandler):
# 注:这里的handle方法名是固定的,是不允许变的
def handle(self):
while 1:
# self.request相当于conn
from_client_msg=self.request.recv(1024)
print(from_client_msg.decode('utf-8'))
msg=input('服务端说:)
self.request.send(msg.encode('utf-8')) if __name__='__main__':
ip_port=('127.0.0.1',8008)
sever=socketsever.ThreadingTCPSever(ip_port,类名)
sever.serve_forever()
以下是客户端的代码
import socket
client=socket.socket()
client.connect(('127.0.0.1',8008))
while 1:
msg=input('客户端说>>>')
if msg=='byebye':
break
client.send(msg.encode('utf-8'))
from_sever_msg=client.recv(1024)
print(from_sever_msg.decode('utf-8')) client.close()
3. 验证合法性连接的服务端
服务端:
from socket import *
import hmac,os
#秘钥
secret_key=b'Jedan has a big key!'
def conn_auth(conn): print('开始验证新链接的合法性')
msg=os.urandom(32)#生成一个32字节的随机字符串
conn.sendall(msg)
h=hmac.new(secret_key,msg)
digest=h.digest()
respone=conn.recv(len(digest))
return hmac.compare_digest(respone,digest) def data_handler(conn,bufsize=1024):
if not conn_auth(conn):
print('该链接不合法,关闭')
conn.close()
return
print('链接合法,开始通信')
while True:
data=conn.recv(bufsize)
if not data:break
conn.sendall(data.upper()) def server_handler(ip_port,bufsize,backlog=5):
'''
只处理链接
:param ip_port:
:return:
'''
tcp_socket_server=socket(AF_INET,SOCK_STREAM)
tcp_socket_server.bind(ip_port)
tcp_socket_server.listen(backlog)
while True:
conn,addr=tcp_socket_server.accept()
print('新连接[%s:%s]' %(addr[0],addr[1]))
data_handler(conn,bufsize) if __name__ == '__main__':
ip_port=('127.0.0.1',9999)
# 自己定义接收大小
bufsize=1024
server_handler(ip_port,bufsize)
客户端:
from socket import *
import hmac,os secret_key=b'Jedan has a big key!'
def conn_auth(conn):
'''
验证客户端到服务器的链接
:param conn:
:return:
'''
msg=conn.recv(32)
h=hmac.new(secret_key,msg)
digest=h.digest()
conn.sendall(digest) def client_handler(ip_port,bufsize=1024):
tcp_socket_client=socket(AF_INET,SOCK_STREAM)
tcp_socket_client.connect(ip_port) conn_auth(tcp_socket_client) while True:
data=input('>>: ').strip()
if not data:continue
if data == 'quit':break tcp_socket_client.sendall(data.encode('utf-8'))
respone=tcp_socket_client.recv(bufsize)
print(respone.decode('utf-8'))
tcp_socket_client.close() if __name__ == '__main__':
ip_port=('127.0.0.1',9999)
bufsize=1024
client_handler(ip_port,bufsize)