Python Tornado篇

时间:2022-11-06 03:57:50

Tornado既是一个web server,也是web framework。而它作为web server 采用的是asynchronous IO的网络模型,这是一种很高效的模型。

Tornado 和现在的主流 Web 服务器框架(包括大多数 Python 的框架)有着明显的区别:它是非阻塞式服务器,而且速度相当快。得利于其 非阻塞的方式和对 epoll 的运用,Tornado 每秒可以处理数以千计的连接,这意味着对于实时 Web 服务来说,Tornado 是一个理想的 Web 框架。

同步IO操作导致请求进程阻塞,知道IO操作完成;异步IO操作不导致请求进程阻塞。

在Python中,同步IO可以被李杰为一个被调用的IO函数会阻塞调用函数的执行,而异步IO则不会阻塞调用函数执行。

安装pip3 install tornado

tornado网站架构简单示例:

import tornado.ioloop
import tornado.web class MainHandler(tornado.web.RequestHandler):
def get(self):
import time
time.sleep(10)
self.write("Hello, world") class IndexHandler(tornado.web.RequestHandler):
def get(self):
self.write("Index") application = tornado.web.Application([
(r"/main", MainHandler),
(r"/index", IndexHandler),
]) if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()

执行过程:

  • 第一步:执行脚本,监听 8888 端口
  • 第二步:浏览器客户端访问 /index  -->  http://127.0.0.1:8888/index
  • 第三步:服务器接受请求,并交由对应的类处理该请求
  • 第四步:类接受到请求之后,根据请求方式(post / get / delete ...)的不同调用并执行相应的方法
  • 第五步:方法返回值的字符串内容发送浏览器
#__author:  Administrator
#date: 2017/3/10
import tornado.web class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world") class LoginHandler(tornado.web.RequestHandler):
def get(self):
# 5.获取用户请求相关信息
# self.get_cookie()
# v = self.get_argument('p')
# print(v)
# self.request 封装了用户发来的所有请求
# print(type(self.request))
# from tornado.httputil import HTTPServerRequest # 6. 额外相应内容
# self.set_cookie('k1','v1')
# self.set_header('h1','v1') # 4. 返回页面+模版引擎
# self.render('login.html')
# self.render('login.html',k1='v1')
# self.render('login.html',k1='v1',k2='v2')
self.render('login.html',**{'k1':'v1','k2':[1,2,3,4],'k3':{'name':'root','age':18}})
# 7. 重定向
# self.redirect('/index/') def post(self, *args, **kwargs):
v = self.get_argument('user')
print(v)
self.redirect('http://www.autohome.com.cn')
import tor.uimethods as mt
from tor import uimodules as md # 8. 配置
settings = {
'static_path': 'static',
'static_url_prefix': '/sss/',
'template_path':'templates',
'ui_methods': mt,
'ui_modules': md,
}
# 1.生成路由规则
application = tornado.web.Application([(r"/index", MainHandler),(r"/login", LoginHandler),],**settings) if __name__ == "__main__":
# 2. 创建socket对象8888
# 将socket对象添加到select或epoll中
application.listen(8888)
# 3. 将select或epoll开始死循环 While True:
tornado.ioloop.IOLoop.instance().start()

异步非阻塞两种做法:

import tornado.ioloop
import tornado.web
from tornado import gen class MainHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
from tornado import httpclient
http = httpclient.AsyncHTTPClient()
yield http.fetch("http://www.google.com", self.done) def done(self, *args, **kwargs):
self.write('Main')
self.finish() class IndexHandler(tornado.web.RequestHandler):
def get(self):
self.write("Index") application = tornado.web.Application([
(r"/main", MainHandler),
(r"/index", IndexHandler),
]) if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()

使用AsyncHTTPClient

httpclient.AsyncHTTPClient()是异步访问,fetch函数会在调用后立即返回而不用等待实际访问的完成,从而导致get()也会立刻执行完成。

当访问实际完成后,AsyncHTTPClient会调用callback参数指定的函数,这里可以任意写代码。

装饰器 + Future 从而实现Tornado的异步非阻塞

import tornado.ioloop
import tornado.web
from tornado import gen
from tornado.concurrent import Future future = None
class MainHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
global future
future = Future()
future.add_done_callback(self.done)
yield future def done(self, *args, **kwargs):
self.write('Main')
self.finish() class IndexHandler(tornado.web.RequestHandler):
def get(self):
global future
future.set_result(None)
self.write("Index") application = tornado.web.Application([
(r"/main", MainHandler),
(r"/index", IndexHandler),
]) if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()

使用Future

当发送GET请求时,由于方法被@gen.coroutine装饰且yield 一个 Future对象,那么Tornado会等待,等待用户向future对象中放置数据或者发送信号,如果获取到数据或信号之后,就开始执行done方法。

异步非阻塞体现在当在Tornaod等待用户向future对象中放置数据时,还可以处理其他请求。

注意:在等待用户向future对象中放置数据或信号时,此连接是不断开的。

自定义异步非阻塞框架

import socket
import select class HttpRequest(object):
"""
用户封装用户请求信息
"""
def __init__(self, content):
"""
:param content:用户发送的请求数据:请求头和请求体
"""
self.content = content
self.header_bytes = bytes()
self.body_bytes = bytes()
self.header_dict = {}
self.method = ""
self.url = ""
self.protocol = ""
self.initialize()
self.initialize_headers() def initialize(self):
temp = self.content.split(b'\r\n\r\n', 1)
if len(temp) == 1:
self.header_bytes += temp
else:
h, b = temp
self.header_bytes += h
self.body_bytes += b @property
def header_str(self):
return str(self.header_bytes, encoding='utf-8') def initialize_headers(self):
headers = self.header_str.split('\r\n')
first_line = headers[0].split(' ')
if len(first_line) == 3:
self.method, self.url, self.protocol = headers[0].split(' ')
for line in headers:
kv = line.split(':')
if len(kv) == 2:
k, v = kv
self.header_dict[k] = v class Future(object):
def __init__(self):
self.result = None F = None def main(request):
global F
F = Future()
return F def stop(request):
global F
F.result = b"xxxxxxxxxxxxx"
return "stop" def index(request):
return "indexasdfasdfasdf" routers = [
('/main/',main),
('/index/',index),
('/stop/',stop),
] def run():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("127.0.0.1", 9999,))
sock.setblocking(False)
sock.listen(128)
inputs = []
inputs.append(sock)
async_request_dict = {
# 'socket': futrue
} while True:
rlist,wlist,elist = select.select(inputs,[],[],0.05)
for r in rlist:
if r == sock:
"""新请求到来"""
conn,addr = sock.accept()
conn.setblocking(False)
inputs.append(conn)
else:
"""客户端发来数据"""
data = b""
while True:
try:
chunk = r.recv(1024)
data = data + chunk
except Exception as e:
chunk = None
if not chunk:
break
# data进行处理:请求头和请求体
request = HttpRequest(data)
print(request.url)
print(request.method)
print(request.body_bytes)
# 1. 请求头中获取url
# 2. 去路由中匹配,获取指定的函数
# 3. 执行函数,获取返回值
# 4. 将返回值 r.sendall(b'alskdjalksdjf;asfd')
import re
flag = False
func = None
for route in routers:
if re.match(route[0],request.url):
flag = True
func = route[1]
break
if flag:
result = func(request)
if isinstance(result,Future):
async_request_dict[r] = result
else:
r.sendall(bytes(result,encoding='utf-8'))
inputs.remove(r)
r.close()
else:
r.sendall(b"")
inputs.remove(r)
r.close() for conn in async_request_dict.keys():
future = async_request_dict[conn]
if future.result:
conn.sendall(future.result)
conn.close()
del async_request_dict[conn]
inputs.remove(conn) if __name__ == '__main__':
run()