Web服务器 / Web Server
对于Web来说,需要建立一个Web服务器,必须建立一个基本的服务器和一个处理程序,
基本服务器的主要作用是,在客户端和服务器端完成必要的HTTP交互,
处理程序的主要作用是,处理客户端的请求,并返回适当的文件,包括静态/动态文件。
1 建立服务器 / Setup Server
在Python中,有一个内置http包,包含了server和client模块,其中server模块包含了几个类,分别为基本服务器类HTTPServer(基类为socketserver.TCPServer),和三个处理程序类,BaseHTTPRequestHandler类(基类为socketserver.StreamRequestHandler),SimpleHTTP-RequestHandler类(基类为BaseHTTPRequestHandler)以及CGIHTTPRequestHandler类(基类为SimpleHTTPRequestHandler),以及一个常用的test函数。
Note: 在Python3之前,这三个类在三个独立模块中,分别为BaseHTTPServer(包含HTTPServer类),SimpleHTTPServer,CGIHTTPServer。对于基本处理程序的类来说,可以通过重定义以下几个方法: do_GET/do_POST/do_HEAD等来实现相应处理功能。
1.1 基本服务器
利用http.server提供的HTTPServer和BaseHTTPRequestHandler两个类建立一个简单的服务器及处理程序。
完整代码
from http.server import BaseHTTPRequestHandler, HTTPServer # http://localhost:80/first_html.html to GET file
class MyHandler(BaseHTTPRequestHandler):
def do_GET(self):
try:
f = open(self.path[1:], 'r')
self.send_response(200)
self.send_header('Content-type', 'text/html'.encode())
self.end_headers()
self.wfile.write(f.read().encode())
f.close()
except IOError:
self.send_error(404,
'File NOt Found: %s' % self.path) def main():
try:
server = HTTPServer(('', 80), MyHandler)
print('Welcome to the machine... Press ^C once or twice to quit.')
server.serve_forever()
except:
print('^C received, shutting down server')
server.socket.close() if __name__ == '__main__':
main()
分段解释
首先导入所需的模块,通过继承基本处理程序类,并重载do_GET方法来实现对GET的处理。其中send_response()发送http状态码,send_header()发送http头部信息,end_headers()添加头部结束表示符,通过wfile的write函数将指定路径读取的内容传给用户。
from http.server import BaseHTTPRequestHandler, HTTPServer # http://localhost:80/first_html.html to GET file
class MyHandler(BaseHTTPRequestHandler):
def do_GET(self):
try:
f = open(self.path[1:], 'r')
self.send_response(200)
self.send_header('Content-type', 'text/html'.encode())
self.end_headers()
self.wfile.write(f.read().encode())
f.close()
except IOError:
self.send_error(404,
'File NOt Found: %s' % self.path)
定义一个主函数用于启动服务器,其中ip为localhost,端口为80,传入自定义的处理程序。启动服务器后,可以使用Web客户端(各种浏览器),输入相应的url(确保GET的文件与url指定的文件匹配),如http://localhost:80/first_html.html来获取网页文件信息。
def main():
try:
server = HTTPServer(('', 80), MyHandler)
print('Welcome to the machine... Press ^C once or twice to quit.')
server.serve_forever()
except:
print('^C received, shutting down server')
server.socket.close() if __name__ == '__main__':
main()
通过上面的简单示例可以对服务器进行进一步的扩展,从而实现更多的功能。
1.2 CGI服务器
接下来介绍利用http.server中的CGIHTTPRequestHandler配合HTTPServer来建立一个CGI服务器,
# e.g.:
# Connect following link to GET HTML
# http://localhost:8000/static_screen.html
from http.server import CGIHTTPRequestHandler, test
test(CGIHTTPRequestHandler)
利用http的CGI服务器的建立十分简单,只需要导入处理模块和test函数,再运行即可。
Note: 此处值得注意的有以下两点,
- 在CGIHTTPRequestHandler中,设置了默认的cgi目录为/cgi-bin(以及/htbin),因此需要在脚本所在目录下建立相应的cgi-bin文件夹,并存放相应cgi文件,这样才会运行cgi脚本,否则只会当做普通的文本文件返回;
- test函数中,默认使用HTTPServer作为基本服务器,使用localhost以及端口8000作为连接参数,协议为HTTP/1.0,处理程序默认为BaseHTTPRequest-Handler。
2 通用网关接口CGI / Common Gateway Interface
2.1 CGI简介
Common Gateway Interface(CGI)是一种定义的标准接口,用于在外部应用程序(CGI程序)和Web服务器之间的信息传递,CGI的规范允许Web服务器执行外部程序,并将这些外部程序的输出发送给Web浏览器。其主要工作流程如下,
- 浏览器通过HTML表单或超链接请求指向一个CGI应用程序的URL;
- 服务器收到请求;
- 服务器执行指定CGI应用程序;
- CGI应用程序执行所需要的操作,通常是基于浏览者输入的内容;
- CGI应用程序把结果格式化为网络服务器和浏览器能够理解的文档(通常是HTML网页);
- 网络服务器把结果返回到浏览器中。
值得注意的是,CGI应用程序时运行在服务器系统上的,执行时会消耗服务器的CPU和内存。同时不完善的CGI应用程序还有可能成为非法进入服务器的通道。目前,一般的生产环境都已不再使用CGI。
2.2 启动CGI服务器
首先,需要打开CGI的服务器,可以选择直接运行前面的CGI服务器脚本或在命令行中通过下列命令直接启动服务器,
python -c "from http.server import CGIHTTPRequestHandler, test;test(CGIHTTPRequestHandler)"
2.3 CGI应用程序
接下来介绍在服务器运行的前提下,使用CGI程序实现客户端与服务器之间的交互功能。
2.3.1 静态表单与CGI结果页面
利用静态的html页面来生成表单,同时在html中定义按钮触发cgi程序运行,生成返回的结果表单。
具体效果如下,
在浏览器中输入url: http://localhost:8000/static_screen.html,将会显示基本页面如下,
通过填好名字后单击submit按钮得到返回页面如下,
返回页面的url为,http://localhost:8000/cgi-bin/cgi_sleep.py?person=LIKE&howmany=7,通过该url可以看到,cgi应用程序的位置,以及请求表单提供的信息(person=LIKE,howmany=7)。
实现上述功能,首先需要定义静态的表单页面,首先定义头部标题,随后显示h3,定义表单的action指向cgi-bin中的cgi应用程序,再创建几个表单(radio类型)输入以及一个提交按钮(submit类型),html代码如下,
<html>
<head>
<title>
Sleep(Static Screen CGI demo)
</title>
</head>
<body>
<h3>
Sleep list for: <i>NEW USER</i>
</h3>
<form action="/cgi-bin/cgi_sleep.py">
<b>Enter your Name:</b>
<input type=text name=person value="NEW USER" size=15>
<p>
<b>How many hours you sleep a day?</b>
<input type=radio name=howmany value='5' checked> 5h
<input type=radio name=howmany value='6'> 6h
<input type=radio name=howmany value='7'> 7h
<input type=radio name=howmany value='8'> 8h
<input type=radio name=howmany value='9'> 9h
<p><input type=submit> <--Click
</form>
</body>
</html>
定义好表单之后,还需要定义表单中指向的cgi程序,
代码如下
import cgi reshtml = '''Content-Type: text/html\n
<html><head><title>
Sleep(Dynamic Screen CGI demo)
</title></head>
<body><h3>sleep list for: <i>%s</i></h3>
Your name is: <b>%s</b><p>
You sleep <b>%s</b> hours a day.
</body></html>''' # cgi.FieldStorage will catch the form from web client
form = cgi.FieldStorage() # Get the value
who = form['person'].value
howmany = form['howmany'].value # print should be html format, and will be return back to web client
print(reshtml % (who, who, howmany))
首先导入cgi模块,同时定义返回的html页面,其中包括返回的头部类型,text/html
cgi模块的FiledStorage类实现了CGI的数据交互功能,首先对其进行实例化,得到的实例中包含client提交的表单信息内容,可以直接通过键值取值获取,而cgi应用程序最终输出(print)的结果将会被返回给客户端。
2.3.2 CGI表单和结果页面
2.3.2.1 基本版本
除了静态表单外,还可以利用CGI程序动态生成表单和最终的结果页面,将下面的cgi程序放在cgi-bin目录下,通过浏览器的url指向该脚本,由于CGIServer中进行了判断会对py(以及pyc)文件进行运行,因此可以直接调用cgi程序。否则将返回文本信息。
完整代码
"""
Run Server first
Connect address: Http://localhost:8000/cgi-bin/cgi_combined_primary.py
"""
import cgi # Http header
header = 'Content-Type: text/html\n\n' # Html form
formhtml = '''<html><head><title>
Sleep CGI Demo</title></head>
<body><h3>Sleep list for: <i>NEW USER</i></h3>
<form action='/cgi-bin/cgi_combined_primary.py'>
<b>Enter your Name: </b>
<input type=hidden name=action value=edit>
<input type=text name=person value='NEW USER' SIZE=15>
<p><b>How many hours you sleep a day?</b>
%s
<p><input type=submit></form></body></html>''' fradio = '<input type=radio name=howmany value="%s" %s> %s\n' def showForm():
sleep = []
for i in range(5, 10):
checked = ''
if i == 5:
checked = 'CHECKED'
sleep.append(fradio % (str(i), checked, str(i)))
print('%s%s' % (header, formhtml % ''.join(sleep))) reshtml = '''<html><head><title>
Sleep CGI Demo</title></head>
<body><h3>Sleep list for: <i>%s</i></h3>
Your name is: <b>%s</b><p>
You sleep <b>%s</b> hours a day.
</body></html>''' def doResults(who, howmany):
print(header + reshtml % (who, who, howmany)) def process():
form = cgi.FieldStorage()
if 'person' in form:
who = form['person'].value # resp
else:
who = 'NEW USER' # show if 'howmany' in form:
howmany = form['howmany'].value # resp else:
howmany = 0 # show # Use action label to judge show or response
if 'action' in form:
doResults(who, howmany) # resp
else:
showForm() # show if __name__ == '__main__':
process()
最终页面的显示结果如下
输入用户名点击提交后显示如下界面,
分段解释
导入模块后,定义http的头部,以及表单页面的html代码,其中需要注意的是,15行中的一个表单类型为hidden,即为不可见的,并且起名为action,值为edit,其目的在于用作一个判断标志,用于判断是一个新提交表单还是一个返回结果的表单,从而执行相应的函数进行处理。
"""
Run Server first
Connect address: Http://localhost:8000/cgi-bin/cgi_combined_primary.py
"""
import cgi # Http header
header = 'Content-Type: text/html\n\n' # Html form
formhtml = '''<html><head><title>
Sleep CGI Demo</title></head>
<body><h3>Sleep list for: <i>NEW USER</i></h3>
<form action='/cgi-bin/cgi_combined_primary.py'>
<b>Enter your Name: </b>
<input type=hidden name=action value=edit>
<input type=text name=person value='NEW USER' SIZE=15>
<p><b>How many hours you sleep a day?</b>
%s
<p><input type=submit></form></body></html>''' fradio = '<input type=radio name=howmany value="%s" %s> %s\n'
showForm()函数用于添加表单输入框,
def showForm():
sleep = []
for i in range(5, 10):
checked = ''
if i == 5:
checked = 'CHECKED'
sleep.append(fradio % (str(i), checked, str(i)))
print('%s%s' % (header, formhtml % ''.join(sleep)))
接着定义返回的html,以及返回处理函数,用于将返回的结果添加到返回html中。
reshtml = '''<html><head><title>
Sleep CGI Demo</title></head>
<body><h3>Sleep list for: <i>%s</i></h3>
Your name is: <b>%s</b><p>
You sleep <b>%s</b> hours a day.
</body></html>''' def doResults(who, howmany):
print(header + reshtml % (who, who, howmany))
最后定义过程函数,首先实例化cgi生成表单实例,此时,判断表单中是否有person和howmany项,若有则说明是提交的表单页,则获取表单信息,没有则说明是初次提供的表单页,设置默认值。接着利用action标志判断是哪一种表单,执行相应处理程序。此处也可以依靠person和howmany字段来判断,但使用action字段更清晰明了。
def process():
form = cgi.FieldStorage()
if 'person' in form:
who = form['person'].value # resp
else:
who = 'NEW USER' # show if 'howmany' in form:
howmany = form['howmany'].value # resp else:
howmany = 0 # show # Use action label to judge show or response
if 'action' in form:
doResults(who, howmany) # resp
else:
showForm() # show if __name__ == '__main__':
process()
2.3.2.2 升级版本
对上面生成表单和结果页面的CGI程序进行升级,添加或改变了以下几个功能,
- 不设置默认勾选选项,当未选择选项时,返回报错页面提示错误;
- 设置返回锚,输入信息后单击返回锚会返回原页面,并自动填入之前输入的信息。
完整代码
"""
Run Server first
Connect address: Http://localhost:8000/cgi-bin/cgi_combined_advanced.py
"""
import cgi
from urllib.parse import quote_plus # Http header
header = 'Content-Type: text/html\n\n'
url = '/cgi-bin/cgi_combined_advanced.py' errhtml = '''<html><head><title>
Sleep CGI Demo</title></head>
<body><h3>ERROR</h3>
<b>%s</b><p>
<form><input Type=button value=Back onclick="window.history.back()"</form>
</body></html>''' def showError(error_str):
print(header + errhtml % error_str) formhtml = '''<html><head><title>
Sleep CGI Demo</title></head>
<body><h3>Sleep list for: <i>%s</i></h3>
<form action="%s">
<b>Enter your Name: </b>
<input type=hidden name=action value=edit>
<input type=text name=person value="%s" SIZE=15>
<p><b>How many hours you sleep a day?</b>
%s
<p><input type=submit></form></body></html>''' fradio = '<input type=radio name=howmany value="%s" %s> %s\n' def showForm(who, howmany):
sleep = []
for i in range(5, 10):
checked = ''
if str(i) == howmany:
checked = 'CHECKED'
sleep.append(fradio % (str(i), checked, str(i)))
print('%s%s' % (header, formhtml % (who, url, who, ''.join(sleep)))) reshtml = '''<html><head><title>
Sleep CGI Demo</title></head>
<body><h3>Sleep list for: <i>%s</i></h3>
Your name is: <b>%s</b><p>
You sleep <b>%s</b> hours a day.
<p>Click <a href="%s">here</a> to edit your data again.
</body></html>''' def doResults(who, howmany):
newurl = url + '?action=reedit&person=%s&howmany=%s' % (quote_plus(who), howmany)
print(header + reshtml % (who, who, howmany, newurl)) def process():
error = ''
form = cgi.FieldStorage() if 'person' in form:
who = form['person'].value.title() # Resp
else:
who = 'NEW USER' # Show if 'howmany' in form:
howmany = form['howmany'].value # Resp
else:
if 'action' in form and form['action'].value == 'edit':
error = 'Please select hours of sleep.' # Error
else:
howmany = 0 # Show if not error:
if 'action' in form and form['action'].value != 'reedit':
doResults(who, howmany) # Resp
else:
showForm(who, howmany) # Show
else:
showError(error) # Error if __name__ == '__main__':
process()
运行服务器并输入指定的URL后,可以看到如下页面,与基本版本的页面相近,但是没有勾选默认选项。
若此时不选择任何选项,直接单击提交按钮,则CGI程序会返回如下的错误提示页面,此时单击返回按钮则会返回之前的页面。
返回之前页面填入名字勾选选项后,会显示结果页面如下,此时可以通过点击here锚链接返回之前选择的页面。
分段解释
首先导入模块,定义通用的http头部以及url的path信息,
"""
Run Server first
Connect address: Http://localhost:8000/cgi-bin/cgi_combined_advanced.py
"""
import cgi
from urllib.parse import quote_plus # Http header
header = 'Content-Type: text/html\n\n'
url = '/cgi-bin/cgi_combined_advanced.py'
定义错误提示页面的html,设置一个表单后退按钮(JavaScript),单击后返回前页面,由于按钮是输入型,因此需要一个表单。
再定义显示错误函数,返回html头部以及错误提示页面html以及错误信息。
errhtml = '''<html><head><title>
Sleep CGI Demo</title></head>
<body><h3>ERROR</h3>
<b>%s</b><p>
<form><input Type=button value=Back onclick="window.history.back()"</form>
</body></html>''' def showError(error_str):
print(header + errhtml % error_str)
接着定义显示页面,以及显示函数,与前面基本版本的类似。
formhtml = '''<html><head><title>
Sleep CGI Demo</title></head>
<body><h3>Sleep list for: <i>%s</i></h3>
<form action="%s">
<b>Enter your Name: </b>
<input type=hidden name=action value=edit>
<input type=text name=person value="%s" SIZE=15>
<p><b>How many hours you sleep a day?</b>
%s
<p><input type=submit></form></body></html>''' fradio = '<input type=radio name=howmany value="%s" %s> %s\n' def showForm(who, howmany):
sleep = []
for i in range(5, 10):
checked = ''
if str(i) == howmany:
checked = 'CHECKED'
sleep.append(fradio % (str(i), checked, str(i)))
print('%s%s' % (header, formhtml % (who, url, who, ''.join(sleep))))
再定义结果返回页面,此处一个明显的区别在于多了一个锚,以here显示,链接锚指向新的url(在原url中接入了新的表单提交信息,设置action为reedit,person和howmany为之前表单提交值)。
reshtml = '''<html><head><title>
Sleep CGI Demo</title></head>
<body><h3>Sleep list for: <i>%s</i></h3>
Your name is: <b>%s</b><p>
You sleep <b>%s</b> hours a day.
<p>Click <a href="%s">here</a> to edit your data again.
</body></html>''' def doResults(who, howmany):
newurl = url + '?action=reedit&person=%s&howmany=%s' % (quote_plus(who), howmany)
print(header + reshtml % (who, who, howmany, newurl))
最后定义过程函数,首先是error信息为空,实例化表单类,对表单实例进行判断,如果person是返回表单,则将表单人名大写,再判断howmany字段,当存在时,说明是返回结果表单,若不存在,判断action是新表单还是错误表单。最后根据error和action字段进行执行相应函数。
def process():
error = ''
form = cgi.FieldStorage() if 'person' in form:
who = form['person'].value.title() # Resp
else:
who = 'NEW USER' # Show if 'howmany' in form:
howmany = form['howmany'].value # Resp
else:
if 'action' in form and form['action'].value == 'edit':
error = 'Please select hours of sleep.' # Error
else:
howmany = 0 # Show if not error:
if 'action' in form and form['action'].value != 'reedit':
doResults(who, howmany) # Resp
else:
showForm(who, howmany) # Show
else:
showError(error) # Error if __name__ == '__main__':
process()
2.3.3 进阶功能CGI
利用CGI程序完成一个包含以下几个特性的服务器交互功能,包括,
- Cookie的使用;
- 同一个CGI字段的多个值;
- 利用multipart表单提交方式实现文件上传。
multipart
目前CGI中特别指出只允许存在两种表单编码,分别为“application/x-www-form-urlencoded”和“multipart/form-data”两种,在提交一个表单时,可以对表单的编码方式encode-type进行声明,默认编码方式为第一种,因此下面的声明是可以省略的,
<form enctype="application/x-www-form-urlencoded" ...>
而当需要用到multipart表单时,则需要声明multipart编码方式,声明方式如下,
<form enctype=" multipart/form-data" ...>
在提交表单时可以使用上述的任意一种,而上传文件时只能使用multipart编码方式,接着通过输入文件类型完成文件上传,
<input type=file name=...>
这段指令可以显示一个空的文本框,同时旁边有个按钮,可以通过按钮浏览文件找到上传文件。
多值字段
多值字段的常见情况为有一系列的复选框允许用户进行多选,每个复选框都有相同的字段名,但为了区分这些复选框,会使用不同的值与特定的复选框关联。
当复选框多选时,一个键将会对应多个值,在这种情况下cgi模块会建立一个包含这些类实例的列表,可以通过遍历获取所有的值。
Cookie
由于HTTP是一个无状态信息的协议,因此为了保持在多个页面之间浏览的连续性,则需要使用cookie,服务器会向客户端发送一个请求来保存cookie,从而让用户的信息数据等保存在客户端处,而不需要占用服务器存储或利用服务器页面嵌入数据的方式来保持数据。
在客户端获得请求文件之前,Web服务器会向客户端发送“Set-Cookie”头文件来要求客户端存储cookie。一旦客户端建立起了cookie,HTTP_COOKIE环境变量会将那些cookie自动放到请求中发送给服务器。cookie是以分号分隔的键值对存在的,即以分号‘;’分隔各个键值对,每个键值对中间都由等号‘=’分开。为此需要进行手动拆分解析。
功能实现
为实现上述几个功能,需要一个稍微复杂一点的CGI程序,具体代码如下,
完整代码
from cgi import FieldStorage
from os import environ
from io import StringIO
from urllib.parse import quote, unquote class AdvCGI(object):
header = 'Content-Type: text/html\n\n'
url = '/cgi-bin/cgi_advanced.py' formhtml = '''<html><head><title>
Advanced CGI Demo</title></head>
<body><h2>Advanced CGI Demo Form</h2>
<form method=post action="%s" enctype="multipart/form-data">
<h3>My Cookie Setting</h3>
<li><code><b>CPPuser = %s</b></code>
<h3>Enter cookie value<br>
<input name=cookie value="%s"> (<i>optional</i>)</h3>
<h3>Enter your name<br>
<input name=person value="%s"> (<i>required</i>)</h3>
<h3>What languages can you program in?
(<i>at least one required<i>)</h3>
%s
<h3>Enter file to upload <small>(max size 4K)</small></h3>
<input type=file name=upfile value="%s" size=45>
<p><input type=submit>
</form></body></html>''' langSet = ('Python', 'Ruby', 'Java', 'C++', 'PHP', 'C', 'JavaScript')
# Set checkbox for language items,
# First %s for real value, second one for CHECKED or not, third one for presented value
langItem = '<input type=checkbox name=lang value="%s"%s> %s\n' # Read cookies from client
def getCPPCookies(self):
# environ contains the environment info, similar to dict usage
# Check whether cookie exists
if 'HTTP_COOKIE' in environ:
# environ['HTTP_COOKIE'] could be like: 'CPPinfo=Like%3APython%2CJava%3AC%3A\path...; CPPuser=Self'
# %3A -> :
# %2C -> ,
cookies = [x.strip() for x in environ['HTTP_COOKIE'].split(';')]
# cookies = ['CPPinfo=Like%3APython%2CJava%3AC%3A\path...', 'CPPuser=Self']
for eachCookie in cookies:
if len(eachCookie) > 6 and eachCookie[:3]=='CPP':
# tag -> info / user
tag = eachCookie[3:7]
try:
self.cookies[tag] = eval(unquote(eachCookie[8:]))
except (NameError, SyntaxError):
self.cookies[tag] = unquote(eachCookie[8:])
if 'info' not in self.cookies:
self.cookies['info'] = ''
if 'user' not in self.cookies:
self.cookies['user'] = ''
else:
self.cookies['info'] = self.cookies['user'] = '' if self.cookies['info'] != '':
print(self.cookies['info'])
try:
self.who, langStr, self.fn = self.cookies['info'].split(' : ')
except:
self.who = self.fn = self.cookies['info']
langStr = ' '
self.langs = langStr.split(',')
else:
self.who = self.fn = ' '
self.langs = ['Python'] def showForm(self):
self.getCPPCookies() # Put together language checkboxes
langStr = []
for eachLang in AdvCGI.langSet:
# Add CHECKED if language name exists in cookies
langStr.append(AdvCGI.langItem % (eachLang, 'CHECKED' if eachLang in self.langs else '', eachLang)) # See if user cookie set up yet
if not ('user' in self.cookies and self.cookies['user']):
cookStatus = '<i>(cookies has not been set yet)</i>'
userCook = ''
else:
userCook = cookStatus = self.cookies['user'] print('%s%s' % (AdvCGI.header, AdvCGI.formhtml % (AdvCGI.url, cookStatus,
userCook, self.who, ''.join(langStr), self.fn))) errhtml = '''<html><head><title>
Advanced CGI Demo</title></head>
<body><h3>Error</h3>
<b>%s</b><p>
<form><input type=button value=back onclick='window.history.back()'></form>
</body></html>''' def showError(self):
print(AdvCGI.header + AdvCGI.errhtml % (self.error)) # Some html tags:
# <li>: for list
# <ol>: odered list
# <ul>: unodered list
# <br>: newline
reshtml = '''<html><head><title>
Advanced CGI Demo</title></head>
<body><h2>Your Uploaded Data</h2>
<h3>Your cookie value is: <b>%s</b></h3>
<h3>Your name is: <b>%s</b></h3>
<h3>You can program in the following languages:</h3>
<ul>%s</ul>
<h3>Your uploaded file...<br>
Name: <i>%s</i><br>
Contents:</h3>
<pre>%s</pre>
Click <a href="%s"><b>here</b></a> to return to form.
</body></html>''' # Tell client to store cookies
def setCPPCookies(self):
for eachCookie in self.cookies.keys():
print('Set-cookie: CPP%s=%s; path=/' % (eachCookie, quote(self.cookies[eachCookie]))) # Display results page
def doResults(self):
MAXBYTES = 4096
langList = ''.join('<li>%s<br>' % eachLang for eachLang in self.langs)
filedata = self.fp.read(MAXBYTES)
# Check file size
if len(filedata) == MAXBYTES and self.fp.read():
filedata = '%s%s' % (filedata,
'...<b><i>(file truncated due to size)</i></b>')
self.fp.close()
if filedata == '':
filedata = '<b><i>(file not given or upload error)</i></b>'
filename = self.fn # See if user cookie set up yet
if not ('user in self.cookies and self.cookies["user"]'):
cookStatus = '<i>(cookie has not been set yet)</i>'
userCook = ''
else:
userCook = cookStatus = self.cookies['user'] # Set cookies
# Use ' : ' rather than ':' to join, because filename may contains ':' like 'c:\windows\...'
self.cookies['info'] = ' : '.join((self.who, ','.join(self.langs), filename))
self.setCPPCookies() print('%s%s' % (AdvCGI.header, AdvCGI.reshtml % (cookStatus, self.who, langList,
filename, filedata, AdvCGI.url))) # Determine which page to return
def go(self):
self.cookies = {}
self.error = ''
form = FieldStorage()
# No form received
if not form.keys():
self.showForm()
return if 'person' in form:
self.who = form['person'].value.strip().title()
if self.who == '':
self.error = 'Your name is required. (blank)'
else:
self.error = 'Your name is required. (missing)' self.cookies['user'] = unquote(form['cookie'].value.strip()) if 'cookie' in form else ''
if 'lang' in form:
langData = form['lang']
if isinstance(langData, list):
self.langs = [eachLang.value for eachLang in langData]
else:
self.langs = [langData.value]
else:
self.error = 'At least one language required.' if 'upfile' in form:
upfile = form['upfile']
# form['upfile'] ->.filename for file name, .file for file data
self.fn = upfile.filename or ''
if upfile.file:
self.fp = upfile.file
else:
self.fp = StringIO('(no data)')
else:
# StringIO as data container
self.fp = StringIO('(no file)')
self.fn = '' if not self.error:
self.doResults()
else:
self.showError() if __name__ == '__main__':
page = AdvCGI()
page.go()
首先连接到服务器打开页面可以看到下图所示界面,
图中可以看出,页面包括一个cooike信息,用户名可选填项,你的名字必选项,多语言复选项以及需要上传的文件选择。
当不填写名字或未勾选内容时,会返回如下的错误信息页面。
填好信息后可以看到结果页面显示了选填的信息,并且显示了上传文件的内容。
点击here按钮会返回之前的页面,同时由于cookie的存在,之前填入的信息都被保存了下来。
分段解析
首先导入所需的模块,cgi用于处理cgi交互,environ用于查看当前环境变量,主要用于查看HTTP_COOKIE使用,StringIO可作为字符串容器,最后的quote和unquote函数用于转换特定字符。
from cgi import FieldStorage
from os import environ
from io import StringIO
from urllib.parse import quote, unquote
接着定义一个AdvCGI类,设置通用的头部参数和url,以及表单页面(其中表单中定义了multipart编码方式以便于文件上传),表单语言输入项,checkbox的html片段。
class AdvCGI(object):
header = 'Content-Type: text/html\n\n'
url = '/cgi-bin/cgi_advanced.py' formhtml = '''<html><head><title>
Advanced CGI Demo</title></head>
<body><h2>Advanced CGI Demo Form</h2>
<form method=post action="%s" enctype="multipart/form-data">
<h3>My Cookie Setting</h3>
<li><code><b>CPPuser = %s</b></code>
<h3>Enter cookie value<br>
<input name=cookie value="%s"> (<i>optional</i>)</h3>
<h3>Enter your name<br>
<input name=person value="%s"> (<i>required</i>)</h3>
<h3>What languages can you program in?
(<i>at least one required<i>)</h3>
%s
<h3>Enter file to upload <small>(max size 4K)</small></h3>
<input type=file name=upfile value="%s" size=45>
<p><input type=submit>
</form></body></html>''' langSet = ('Python', 'Ruby', 'Java', 'C++', 'PHP', 'C', 'JavaScript')
# Set checkbox for language items,
# First %s for real value, second one for CHECKED or not, third one for presented value
langItem = '<input type=checkbox name=lang value="%s"%s> %s\n'
在类中定义一个从客户端获取cookie的方法,首先利用os.environ类查看是否含有HTTP_COOKIE参数,如果有则说明包含cookie信息,通过os.environ[‘HTTP_COOKIE’]可以获取到cookie的信息,这个类的使用方式类似于字典,也具有*.keys()方法。当前示例中获取到的cookie大致为如下格式‘CPPinfo=Like%3APython%2CJava%3AC%3A\path...; CPPuser=Self’,其中%3A为冒号‘:’,%2C分号为‘;’。
Note: 浏览器发送给客户端设置cookie的头部格式为Set-Cookie,浏览器响应头部为Cookie。
在获取cookie的方法中主要对cookie作了如下的处理,
- 判断是否有cookie的存在,有则执行cookie操作;
- 根据分号对cookie进行split操作,分开cookie的每一对属性键值,并进行遍历;
- 在遍历中,检测CPP开头,获取CPP后4(3:7)位作为标签,获取8位之后为属性值;
- 没有cookie则设置为空;
- 判断cookie中的info字段是否为空,不为空则利用‘ : ’进行分割,此处由于有路径信息,可能存在多余冒号,因此在切分特征的冒号前后加入空格进行区别;
- 将信息切分得到用户名,使用语言,上传文件路径3个信息,再对其中的使用语言按逗号进行分割,从而获取所有信息。
# Read cookies from client
def getCPPCookies(self):
# environ contains the environment info, similar to dict usage
# Check whether cookie exists
if 'HTTP_COOKIE' in environ:
# environ['HTTP_COOKIE'] could be like: 'CPPinfo=Like%3APython%2CJava%3AC%3A\path...; CPPuser=Self'
# %3A -> :
# %2C -> ,
cookies = [x.strip() for x in environ['HTTP_COOKIE'].split(';')]
# cookies = ['CPPinfo=Like%3APython%2CJava%3AC%3A\path...', 'CPPuser=Self']
for eachCookie in cookies:
if len(eachCookie) > 6 and eachCookie[:3]=='CPP':
# tag -> info / user
tag = eachCookie[3:7]
try:
self.cookies[tag] = eval(unquote(eachCookie[8:]))
except (NameError, SyntaxError):
self.cookies[tag] = unquote(eachCookie[8:])
if 'info' not in self.cookies:
self.cookies['info'] = ''
if 'user' not in self.cookies:
self.cookies['user'] = ''
else:
self.cookies['info'] = self.cookies['user'] = '' if self.cookies['info'] != '':
print(self.cookies['info'])
try:
self.who, langStr, self.fn = self.cookies['info'].split(' : ')
except:
self.who = self.fn = self.cookies['info']
langStr = ' '
self.langs = langStr.split(',')
else:
self.who = self.fn = ' '
self.langs = ['Python']
定义一个显示表单的方法,用于初次页面的显示使用,首先获取服务器cookie信息,设置语言表单,再检测cookie是否加载,最后返回整个显示表单页面。
def showForm(self):
self.getCPPCookies() # Put together language checkboxes
langStr = []
for eachLang in AdvCGI.langSet:
# Add CHECKED if language name exists in cookies
langStr.append(AdvCGI.langItem % (eachLang, 'CHECKED' if eachLang in self.langs else '', eachLang)) # See if user cookie set up yet
if not ('user' in self.cookies and self.cookies['user']):
cookStatus = '<i>(cookies has not been set yet)</i>'
userCook = ''
else:
userCook = cookStatus = self.cookies['user'] print('%s%s' % (AdvCGI.header, AdvCGI.formhtml % (AdvCGI.url, cookStatus,
userCook, self.who, ''.join(langStr), self.fn)))
设置错误提示页面的html代码,并定义显示错误页面的函数,
errhtml = '''<html><head><title>
Advanced CGI Demo</title></head>
<body><h3>Error</h3>
<b>%s</b><p>
<form><input type=button value=back onclick='window.history.back()'></form>
</body></html>''' def showError(self):
print(AdvCGI.header + AdvCGI.errhtml % (self.error))
设置返回页面的html,其中html标签<li>设置列表元素,<ol>为有序列表,<ul>为无序列表,<br>换行显示。
# Some html tags:
# <li>: for list
# <ol>: odered list
# <ul>: unodered list
# <br>: newline
reshtml = '''<html><head><title>
Advanced CGI Demo</title></head>
<body><h2>Your Uploaded Data</h2>
<h3>Your cookie value is: <b>%s</b></h3>
<h3>Your name is: <b>%s</b></h3>
<h3>You can program in the following languages:</h3>
<ul>%s</ul>
<h3>Your uploaded file...<br>
Name: <i>%s</i><br>
Contents:</h3>
<pre>%s</pre>
Click <a href="%s"><b>here</b></a> to return to form.
</body></html>'''
定义一个设置cookie的方法,用于将set cookie头部发送给客户端来保存cookie,
# Tell client to store cookies
def setCPPCookies(self):
for eachCookie in self.cookies.keys():
print('Set-cookie: CPP%s=%s; path=/' % (eachCookie, quote(self.cookies[eachCookie])))
定义一个返回显示页面的方法,主要过程如下,
- 首先设置上传文件的最大限制为4k,对文件进行读取,只显示文件的前4k长度;
- 文件不存在则显示无上传文件;
- 检测用户cookie信息是否存在,返回相应的显示;
# Display results page
def doResults(self):
MAXBYTES = 4096
langList = ''.join('<li>%s<br>' % eachLang for eachLang in self.langs)
filedata = self.fp.read(MAXBYTES)
# Check file size
if len(filedata) == MAXBYTES and self.fp.read():
filedata = '%s%s' % (filedata,
'...<b><i>(file truncated due to size)</i></b>')
self.fp.close()
if filedata == '':
filedata = '<b><i>(file not given or upload error)</i></b>'
filename = self.fn # See if user cookie set up yet
if not ('user in self.cookies and self.cookies["user"]'):
cookStatus = '<i>(cookie has not been set yet)</i>'
userCook = ''
else:
userCook = cookStatus = self.cookies['user'] # Set cookies
# Use ' : ' rather than ':' to join, because filename may contains ':' like 'c:\windows\...'
self.cookies['info'] = ' : '.join((self.who, ','.join(self.langs), filename))
self.setCPPCookies() print('%s%s' % (AdvCGI.header, AdvCGI.reshtml % (cookStatus, self.who, langList,
filename, filedata, AdvCGI.url)))
最后定义一个go方法,包含了整个执行流程的控制,
- 生成表单实例,初始化cookie和错误信息为空;
- 检测form中是否有关键字,如果没有则说明表单form为空,是第一次请求,调用显示界面方法;
- 如果表单中有请求字段,则获取字段,进行处理,若person字段值为空则设置错误信息;
- 设置user的值,如果没有则填入空字符串;
- 检测勾选的语言项,如果没有勾选则设置错误信息。此处用到了多值字段,即多选时则对form[‘lang’]进行遍历后再取值;
- 查看文件信息,根据文件进行操作;
- 最后根据是否有错误信息决定返回结果页面或错误提示页面。
# Determine which page to return
def go(self):
self.cookies = {}
self.error = ''
form = FieldStorage()
# No form received
if not form.keys():
self.showForm()
return if 'person' in form:
self.who = form['person'].value.strip().title()
if self.who == '':
self.error = 'Your name is required. (blank)'
else:
self.error = 'Your name is required. (missing)' self.cookies['user'] = unquote(form['cookie'].value.strip()) if 'cookie' in form else ''
if 'lang' in form:
langData = form['lang']
if isinstance(langData, list):
self.langs = [eachLang.value for eachLang in langData]
else:
self.langs = [langData.value]
else:
self.error = 'At least one language required.' if 'upfile' in form:
upfile = form['upfile']
# form['upfile'] ->.filename for file name, .file for file data
self.fn = upfile.filename or ''
if upfile.file:
self.fp = upfile.file
else:
self.fp = StringIO('(no data)')
else:
# StringIO as data container
self.fp = StringIO('(no file)')
self.fn = '' if not self.error:
self.doResults()
else:
self.showError() if __name__ == '__main__':
page = AdvCGI()
page.go()
3 服务器网关接口WSGI / Web Server Gateway Interface
3.1 WSGI简介
PythonWeb服务器网关接口(Web Server Gateway Interface)是Python应用程序或框架和Web服务器之间的一种接口,目前被广泛接受,但是WSGI并没有官方的实现,因为WSGI更像是一个协议,遵照协议的规定,WSGI的应用程序即可以在任何服务器上运行,WSGI的是基于现存的CGI标准而设计的。
WSGI标准在PEP333中进行了定义,目前许多框架已实现,包括django框架。
3.2 WSGI使用示例
在此对WSGI的使用进行简单的介绍,主要包括两部分,app和服务器,代码如下,
App
"""
Run server
Connect to http://localhost:8000/
""" def simple_wsgi_app(environ, start_response):
status = '200 OK'
headers = [('Content-type', 'text/plain')]
# This function pass status code and header, status code/header decided inside wsgi app
start_response(status, headers)
return [b'Hello world']
上面的代码中首先定义一个简单的WSGI的app程序,对于这个app程序,需要将其传入给服务器,因此WSGI规定这个传入的app程序需要接受两个参数,
第一个参数是environ,为包含环境变量的字典,
第二个参数start_response为一个可调用对象,这个对象需要能够将HTTP的状态码(40x/50x)以及头部信息等返回给浏览器。
最后函数还应该返回一个可迭代对象,其中包含了返回给浏览器客户端的信息(HTML等)。
服务器
在完成上面的简单app后,接下来分析简单服务器的基本运行过程,这个服务器会接收两个参数,即之前提到的environ,和app参数,在其内部定义了start_response()函数,接收状态码和头部信息,最后返回一个write的可执行对象。
在服务器内会将start_response函数和environ参数传给app进行执行,最终利用write对象把信息返回给客户端。
from io import StringIO
import sys def simple_wsgi_server(app, environ=None):
sio = StringIO() def start_response(status, headers):
sio.write('Status: %s\r\n' % status)
for header in headers:
sio.write('%s: %s\r\n' % header)
return sio.write iterable = app(environ, start_response)
try:
if not sio.getvalue():
raise RuntimeError("start_response() not called by app!")
sio.write('\r\n%s\r\n' % '\r\n'.join(str(line) for line in iterable))
finally:
if hasattr(iterable, 'close') and callable(iterable.close):
iterable.close() sys.stdout.write(sio.getvalue())
sys.stdout.flush()
这里还提供了一个简单的WSGI的服务器和示例app,直接导入模块即可以进行使用。当然在app程序的选择上可以使用提供的demo_app也可以使用自定义的app。自定义的app最终返回hello world,而示例的demo_app还会返回参数信息。
from wsgiref.simple_server import make_server, demo_app
def wsgi_server(app):
httpd = make_server('', 8000, app)
print('Started app serving on port 8000...')
httpd.serve_forever() em_app = False
em_server = True
wsgi_app = demo_app if em_app else simple_wsgi_app
wsgi_server = wsgi_server if em_server else simple_wsgi_server
运行服务器后,输入url链接: http://localhost:8000/,可以得到返回的界面如下,
参考链接
《Python 核心编程 第3版》