Python学习_socket学习 & socketserver学习 & IO多路复用

时间:2021-11-26 05:18:22

简单的socket项目:

client端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket

obj = socket.socket()

obj.connect(("127.0.0.1", 9999,))

# recv 也是阻塞的
ret_bytes = obj.recv(1024)
ret_str = str(ret_bytes, encoding="utf-8")
print(ret_str)
while True:
    inp = input("请输入要发送的内容:")
    if inp == "q":
        obj.sendall(bytes(inp, encoding="utf-8"))
        break
    else:
        obj.sendall(bytes(inp, encoding="utf-8"))
        ret = str(obj.recv(1024), encoding="utf-8")
        print(ret)
obj.close()

server端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket

sk = socket.socket()
# 绑定ip和端口
sk.bind(("127.0.0.1", 9999,))
# 开启监听
sk.listen(5)
# 接收客户端的请求
while True:
    conn, address = sk.accept()  # accept阻塞
    conn.sendall(bytes("欢迎郭屌毛访问!", encoding="utf-8"))
    while True:
        # 接收客户端发过来的消息,限制1024个字节
        ret_bytes = conn.recv(1024)
        ret_str = str(ret_bytes, encoding="utf-8")
        if ret_str == "q":
            break
        # 给客户端发送消息
        conn.sendall(bytes(ret_str + "你好", encoding="utf-8"))

为解决socket的并发问题,使用socketserver:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

# 解决socket并发问题,使用socketserver
import socketserver


# 定义一个class,必须继承 socketserver.BaseRequestHandler 类
class MyServer(socketserver.BaseRequestHandler):
    # 重写handle方法
    def handle(self):
        print(self.client_address)
        print(self.server)
        print(self.request)
        conn = self.request
        conn.sendall(bytes("欢迎访问xxx系统!", encoding="utf-8"))
        while True:
            ret_bytes = conn.recv(1024)
            ret_str = str(ret_bytes, encoding="utf-8")
            if ret_str == "q":
                break
            conn.sendall(bytes(ret_str + " 你好!", encoding="utf-8"))


if __name__ == '__main__':
    # 使用刚才的类创建server
    server = socketserver.ThreadingTCPServer(("127.0.0.1", 9999), MyServer)
    # serve_forever 等价于 while True,使server一直阻塞,等待连接
    server.serve_forever()

socketserver源码分析:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socketserver


class MyServer(socketserver.BaseRequestHandler):

    def handle(self):
        self.request


if __name__ == '__main__':
    # socket + select + 多线程
    # ip和端口,类名
    # MyServer == RequestHandlerClass
    # ThreadingTCPServer.init() => TCPServer.init() => BaseServer.init()
    # server 对象:
        # self.server_address == ("127.0.0.1", 9999)
        # self.RequestHandlerClass == MyServer
        # self.socket = 创建的socket对象
    server = socketserver.ThreadingTCPServer(("127.0.0.1", 9999), MyServer)
    # server对象的serve_forever(), 在 BaseServer 中找到serve_forever()
    # --------执行流程如下---------
    # BaseServer.serve_forever() => BaseServer._handle_request_noblock() => ThreadingMixIn.process_request()
    # => ThreadingMixIn.process_request_thread() => BaseServer.finish_request()
    # => self.RequestHandlerClass(request, client_address, self) 等价于 MyServer() 执行 BaseRequestHandler.init()方法
    server.serve_forever()

IO多路复用:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
import select

# IO多路复用: 可以监听多个 文件描述符(socket对象)(文件句柄),一旦文件句柄出现变化,即可感知

sk1 = socket.socket()
sk1.bind(("127.0.0.1", 8001))
sk1.listen(5)

# sk2 = socket.socket()
# sk2.bind(("127.0.0.1", 8002))
# sk2.listen(5)
#
# sk3 = socket.socket()
# sk3.bind(("127.0.0.1", 8003))
# sk3.listen(5)

print("sk1 ", sk1)
inputs = [sk1, ]
outputs = []
message_dict = {}

while True:
    # select 内部自动监听sk1,sk2,sk3三个对象,一旦某个句柄发生变化
    # 解释:
    # select内部自动监听sk1,sk2,sk3三个对象,监听三个句柄是否发生变化,把发生变化的元素放入r_list中。
    # 如果有人连接sk1,则r_list = [sk1]
    # 如果有人连接sk1和sk2,则r_list = [sk1,sk2]
    # select中第1个参数表示inputs中发生变化的句柄放入r_list。
    # select中第2个参数表示[]中的值原封不动的传递给w_list。
    # select中第3个参数表示inputs中发生错误的句柄放入e_list。
    # 参数1表示1秒监听一次

    # 如果有人第一次来连接,sk1发送变化
    # r_list = [sk1]
    r_list, w_list, e_list = select.select(inputs, outputs, inputs, 1)
    print("正在监听的socket对象:%d" % len(inputs))
    print(r_list)
    for sk_or_conn in r_list:
        # 每一个连接对象
        if sk_or_conn == sk1:
            # 表示有新用户来连接了
            conn, address = sk_or_conn.accept()
            # inputs = [sk1, conn, ....]
            inputs.append(conn)
            # message_dict = {"conn1":[], "conn2":[], ...}
            message_dict[conn] = []
        else:
            # 有老用户发消息了
            try:
                # 当用户连接中断的时候,data_bytes 为空 == 此情况适用于2.7版本
                data_bytes = sk_or_conn.recv(1024)
            except Exception as e:
                # 如果用户中断连接
                print(e)
                inputs.remove(sk_or_conn)
            else:
                # 用户正常发送消息
                data_str = str(data_bytes, encoding="utf-8")
                # sk_or_conn.sendall(bytes(res + " 你好!", encoding="utf-8"))
                message_dict[sk_or_conn].append(data_str)
                outputs.append(sk_or_conn)
    # w_list仅仅保存了谁给我发过消息
    for conn in w_list:
        # 这里可以优化,使用queue优化,后面再说
        recv_str = message_dict[conn][0]
        del message_dict[conn][0]
        conn.sendall(bytes(recv_str + " 你好!", encoding="utf-8"))
        # 发完消息后删除socket对象
        outputs.remove(conn)

    for sk_or_conn in e_list:
        inputs.remove(sk_or_conn)

# while True:
#     conn, address = sk.accept()
#     while True:
#         content_bytes = conn.recv(1024)
#         content_str = bytes(content_bytes, encoding="utf-8")
#         conn.sendall(bytes(content_str + "好", encoding="utf-8"))
#     conn.close()


# 一、概念
# 异步:某个事情需要10s完成。而我只需要调用某个函数告诉xxx来帮我做(然后我再干其他的事情)
# 同步:某个事情需要10s完成,我需要一直等它完成(等10s),再能继续后面的工作。
# 阻塞:做某件事情,直到完成,除非超时
# 非阻塞:尝试做,如果不能做,就不做(直接返回),如果能做,就做。
# 前两者和后两者不容易区分,不过前两者更多的有涉及到多线程交互(消息)的场景。
# 二、举个例子
# 小李喝了想喝水,于是去煮开水。
# 1、小李把水壶放到炉子上,等待水烧开。(同步阻塞)
# 小李感觉这样太费时间。
# 2、小李把水壶放到炉子上,去客厅看电视,时不时去厨房看看水开没有。(同步非阻塞)
# 小李还是觉得自己这样太累,于是买了把会响笛的那种水壶。水开之后,能发出声音。
# 3、小李把响水壶放到炉子上,等待水壶发出声音。(异步阻塞)
# 觉得这样傻等意义不大
# 5、小李把响水壶放到炉子上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞)
# 这样真好。