web.py 学习(二)Worker

时间:2023-12-23 15:02:20

Rocket Server 启动一个线程监听客户端的连接,收到连接将连接放置到队列中。线程池中的Worker会以这个连接进行初始化。Rocket中Worker的基类是:

class Worker(Thread):
"""The Worker class is a base class responsible for receiving connections
and (a subclass) will run an application to process the the connection """ def __init__(self,
app_info,
active_queue,
monitor_queue,
*args,
**kwargs): Thread.__init__(self, *args, **kwargs) # Instance Variables
self.app_info = app_info
self.active_queue = active_queue
self.monitor_queue = monitor_queue self.size = 0
self.status = "200 OK"
self.closeConnection = True
self.request_line = ""
self.protocol = 'HTTP/1.1' # Request Log
self.req_log = logging.getLogger('Rocket.Requests')
self.req_log.addHandler(NullHandler()) # Error Log
self.err_log = logging.getLogger('Rocket.Errors.' + self.getName())
self.err_log.addHandler(NullHandler()) def _handleError(self, typ, val, tb):
if typ == SSLError:
if 'timed out' in str(val.args[0]):
typ = SocketTimeout
if typ == SocketTimeout:
if __debug__:
self.err_log.debug('Socket timed out')
self.monitor_queue.put(self.conn)
return True
if typ == SocketClosed:
self.closeConnection = True
if __debug__:
self.err_log.debug('Client closed socket')
return False
if typ == BadRequest:
self.closeConnection = True
if __debug__:
self.err_log.debug('Client sent a bad request')
return True
if typ == socket.error:
self.closeConnection = True
if val.args[0] in IGNORE_ERRORS_ON_CLOSE:
if __debug__:
self.err_log.debug('Ignorable socket Error received...'
'closing connection.')
return False
else:
self.status = "999 Utter Server Failure"
tb_fmt = traceback.format_exception(typ, val, tb)
self.err_log.error('Unhandled Error when serving '
'connection:\n' + '\n'.join(tb_fmt))
return False self.closeConnection = True
tb_fmt = traceback.format_exception(typ, val, tb)
self.err_log.error('\n'.join(tb_fmt))
self.send_response('500 Server Error')
return False def run(self):
if __debug__:
self.err_log.debug('Entering main loop.') # Enter thread main loop
while True:
conn = self.active_queue.get() if not conn:
# A non-client is a signal to die
if __debug__:
self.err_log.debug('Received a death threat.')
return conn if isinstance(conn, tuple):
conn = Connection(*conn) self.conn = conn if conn.ssl != conn.secure:
self.err_log.info('Received HTTP connection on HTTPS port.')
self.send_response('400 Bad Request')
self.closeConnection = True
conn.close()
continue
else:
if __debug__:
self.err_log.debug('Received a connection.')
self.closeConnection = False # Enter connection serve loop
while True:
if __debug__:
self.err_log.debug('Serving a request')
try:
self.run_app(conn)
except:
exc = sys.exc_info()
handled = self._handleError(*exc)
if handled:
break
finally:
if self.request_line:
log_info = dict(client_ip=conn.client_addr,
time=datetime.now().strftime('%c'),
status=self.status.split(' ')[0],
size=self.size,
request_line=self.request_line)
self.req_log.info(LOG_LINE % log_info) if self.closeConnection:
try:
conn.close()
except:
self.err_log.error(str(traceback.format_exc())) break def run_app(self, conn):
# Must be overridden with a method reads the request from the socket
# and sends a response.
self.closeConnection = True
raise NotImplementedError('Overload this method!') def send_response(self, status):
stat_msg = status.split(' ', 1)[1]
msg = RESPONSE % (self.protocol,
status,
len(stat_msg),
'text/plain',
stat_msg)
try:
self.conn.sendall(b(msg))
except socket.timeout:
self.closeConnection = True
msg = 'Tried to send "%s" to client but received timeout error'
self.err_log.error(msg % status)
except socket.error:
self.closeConnection = True
msg = 'Tried to send "%s" to client but received socket error'
self.err_log.error(msg % status) def read_request_line(self, sock_file):
self.request_line = ''
try:
# Grab the request line
d = sock_file.readline()
if PY3K:
d = d.decode('ISO-8859-1') if d == '\r\n':
# Allow an extra NEWLINE at the beginning per HTTP 1.1 spec
if __debug__:
self.err_log.debug('Client sent newline') d = sock_file.readline()
if PY3K:
d = d.decode('ISO-8859-1')
except socket.timeout:
raise SocketTimeout('Socket timed out before request.')
except TypeError:
raise SocketClosed(
'SSL bug caused closure of socket. See '
'"https://groups.google.com/d/topic/web2py/P_Gw0JxWzCs".') d = d.strip() if not d:
if __debug__:
self.err_log.debug(
'Client did not send a recognizable request.')
raise SocketClosed('Client closed socket.') self.request_line = d # NOTE: I've replaced the traditional method of procedurally breaking
# apart the request line with a (rather unsightly) regular expression.
# However, Java's regexp support sucks so bad that it actually takes
# longer in Jython to process the regexp than procedurally. So I've
# left the old code here for Jython's sake...for now.
if IS_JYTHON:
return self._read_request_line_jython(d) match = re_REQUEST_LINE.match(d) if not match:
self.send_response('400 Bad Request')
raise BadRequest req = match.groupdict()
for k, v in req.iteritems():
if not v:
req[k] = ""
if k == 'path':
req['path'] = r'%2F'.join(
[unquote(x) for x in re_SLASH.split(v)]) self.protocol = req['protocol']
return req def _read_request_line_jython(self, d):
d = d.strip()
try:
method, uri, proto = d.split(' ')
if not proto.startswith('HTTP') or \
proto[-3:] not in ('1.0', '1.1') or \
method not in HTTP_METHODS:
self.send_response('400 Bad Request')
raise BadRequest
except ValueError:
self.send_response('400 Bad Request')
raise BadRequest req = dict(method=method, protocol=proto)
scheme = ''
host = ''
if uri == '*' or uri.startswith('/'):
path = uri
elif '://' in uri:
scheme, rest = uri.split('://')
host, path = rest.split('/', 1)
path = '/' + path
else:
self.send_response('400 Bad Request')
raise BadRequest query_string = ''
if '?' in path:
path, query_string = path.split('?', 1) path = r'%2F'.join([unquote(x) for x in re_SLASH.split(path)]) req.update(path=path,
query_string=query_string,
scheme=scheme.lower(),
host=host)
return req def read_headers(self, sock_file):
try:
headers = dict()
lname = None
lval = None
while True:
l = sock_file.readline() if PY3K:
try:
l = str(l, 'ISO-8859-1')
except UnicodeDecodeError:
self.err_log.warning(
'Client sent invalid header: ' + repr(l)) if l.strip().replace('\0', '') == '':
break if l[0] in ' \t' and lname:
# Some headers take more than one line
lval += ' ' + l.strip()
else:
# HTTP header values are latin-1 encoded
l = l.split(':', 1)
# HTTP header names are us-ascii encoded lname = l[0].strip().upper().replace('-', '_')
lval = l[-1].strip() headers[str(lname)] = str(lval) except socket.timeout:
raise SocketTimeout("Socket timed out before request.") return headers

Worker继承自Thread,线程启动会执行run方法,在这个方法中做了以下的事情:

1、从连接队列中取得con对象

2、执行方法run_app(con),这个方法由子类实现

Rocket中Worker子类是:

class WSGIWorker(Worker):
def __init__(self, *args, **kwargs):
"""Builds some instance variables that will last the life of the
thread."""
Worker.__init__(self, *args, **kwargs) if isinstance(self.app_info, dict):
multithreaded = self.app_info.get('max_threads') != 1
else:
multithreaded = False
self.base_environ = dict(
{'SERVER_SOFTWARE': self.app_info['server_software'],
'wsgi.multithread': multithreaded,
})
self.base_environ.update(BASE_ENV) # Grab our application
self.app = self.app_info.get('wsgi_app') if not hasattr(self.app, "__call__"):
raise TypeError("The wsgi_app specified (%s) is not a valid WSGI application." % repr(self.app)) # Enable futures
if has_futures and self.app_info.get('futures'):
executor = self.app_info['executor']
self.base_environ.update({"wsgiorg.executor": executor,
"wsgiorg.futures": executor.futures}) def build_environ(self, sock_file, conn):
""" Build the execution environment. """
# Grab the request line
request = self.read_request_line(sock_file) # Copy the Base Environment
environ = self.base_environ.copy() # Grab the headers
for k, v in self.read_headers(sock_file).iteritems():
environ[str('HTTP_' + k)] = v # Add CGI Variables
environ['REQUEST_METHOD'] = request['method']
environ['PATH_INFO'] = request['path']
environ['SERVER_PROTOCOL'] = request['protocol']
environ['SERVER_PORT'] = str(conn.server_port)
environ['REMOTE_PORT'] = str(conn.client_port)
environ['REMOTE_ADDR'] = str(conn.client_addr)
environ['QUERY_STRING'] = request['query_string']
if 'HTTP_CONTENT_LENGTH' in environ:
environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH']
if 'HTTP_CONTENT_TYPE' in environ:
environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE'] # Save the request method for later
self.request_method = environ['REQUEST_METHOD'] # Add Dynamic WSGI Variables
if conn.ssl:
environ['wsgi.url_scheme'] = 'https'
environ['HTTPS'] = 'on'
try:
peercert = conn.socket.getpeercert(binary_form=True)
environ['SSL_CLIENT_RAW_CERT'] = \
peercert and ssl.DER_cert_to_PEM_cert(peercert)
except Exception:
print sys.exc_info()[1]
else:
environ['wsgi.url_scheme'] = 'http' if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
environ['wsgi.input'] = ChunkedReader(sock_file)
else:
environ['wsgi.input'] = sock_file return environ def send_headers(self, data, sections):
h_set = self.header_set # Does the app want us to send output chunked?
self.chunked = h_set.get('Transfer-Encoding', '').lower() == 'chunked' # Add a Date header if it's not there already
if not 'Date' in h_set:
h_set['Date'] = formatdate(usegmt=True) # Add a Server header if it's not there already
if not 'Server' in h_set:
h_set['Server'] = HTTP_SERVER_SOFTWARE if 'Content-Length' in h_set:
self.size = int(h_set['Content-Length'])
else:
s = int(self.status.split(' ')[0])
if (s < 200 or s not in (204, 205, 304)) and not self.chunked:
if sections == 1 or self.protocol != 'HTTP/1.1':
# Add a Content-Length header because it's not there
self.size = len(data)
h_set['Content-Length'] = str(self.size)
else:
# If they sent us more than one section, we blow chunks
h_set['Transfer-Encoding'] = 'Chunked'
self.chunked = True
if __debug__:
self.err_log.debug('Adding header...'
'Transfer-Encoding: Chunked') if 'Connection' not in h_set:
# If the application did not provide a connection header,
# fill it in
client_conn = self.environ.get('HTTP_CONNECTION', '').lower()
if self.environ['SERVER_PROTOCOL'] == 'HTTP/1.1':
# HTTP = 1.1 defaults to keep-alive connections
if client_conn:
h_set['Connection'] = client_conn
else:
h_set['Connection'] = 'keep-alive'
else:
# HTTP < 1.1 supports keep-alive but it's quirky
# so we don't support it
h_set['Connection'] = 'close' # Close our connection if we need to.
self.closeConnection = h_set.get('Connection', '').lower() == 'close' # Build our output headers
header_data = HEADER_RESPONSE % (self.status, str(h_set)) # Send the headers
if __debug__:
self.err_log.debug('Sending Headers: %s' % repr(header_data))
self.conn.sendall(b(header_data))
self.headers_sent = True def write_warning(self, data, sections=None):
self.err_log.warning('WSGI app called write method directly. This is '
'deprecated behavior. Please update your app.')
return self.write(data, sections) def write(self, data, sections=None):
""" Write the data to the output socket. """ if self.error[0]:
self.status = self.error[0]
data = b(self.error[1]) if not self.headers_sent:
self.send_headers(data, sections) if self.request_method != 'HEAD':
try:
if self.chunked:
self.conn.sendall(b('%x\r\n%s\r\n' % (len(data), data)))
else:
self.conn.sendall(data)
except socket.timeout:
self.closeConnection = True
except socket.error:
# But some clients will close the connection before that
# resulting in a socket error.
self.closeConnection = True def start_response(self, status, response_headers, exc_info=None):
""" Store the HTTP status and headers to be sent when self.write is
called. """
if exc_info:
try:
if self.headers_sent:
# Re-raise original exception if headers sent
# because this violates WSGI specification.
raise
finally:
exc_info = None
elif self.header_set:
raise AssertionError("Headers already set!") if PY3K and not isinstance(status, str):
self.status = str(status, 'ISO-8859-1')
else:
self.status = status
# Make sure headers are bytes objects
try:
self.header_set = Headers(response_headers)
except UnicodeDecodeError:
self.error = ('500 Internal Server Error',
'HTTP Headers should be bytes')
self.err_log.error('Received HTTP Headers from client that contain'
' invalid characters for Latin-1 encoding.') return self.write_warning def run_app(self, conn):
self.size = 0
self.header_set = Headers([])
self.headers_sent = False
self.error = (None, None)
self.chunked = False
sections = None
output = None if __debug__:
self.err_log.debug('Getting sock_file') # Build our file-like object
if PY3K:
sock_file = conn.makefile(mode='rb', buffering=BUF_SIZE)
else:
sock_file = conn.makefile(BUF_SIZE) try:
# Read the headers and build our WSGI environment
self.environ = environ = self.build_environ(sock_file, conn) # Handle 100 Continue
if environ.get('HTTP_EXPECT', '') == '100-continue':
res = environ['SERVER_PROTOCOL'] + ' 100 Continue\r\n\r\n'
conn.sendall(b(res)) # Send it to our WSGI application
output = self.app(environ, self.start_response) if not hasattr(output, '__len__') and not hasattr(output, '__iter__'):
self.error = ('500 Internal Server Error',
'WSGI applications must return a list or '
'generator type.') if hasattr(output, '__len__'):
sections = len(output) for data in output:
# Don't send headers until body appears
if data:
self.write(data, sections) if not self.headers_sent:
# Send headers if the body was empty
self.send_headers('', sections) if self.chunked and self.request_method != 'HEAD':
# If chunked, send our final chunk length
self.conn.sendall(b('0\r\n\r\n')) # Don't capture exceptions here. The Worker class handles
# them appropriately.
finally:
if __debug__:
self.err_log.debug('Finally closing output and sock_file') if hasattr(output, 'close'):
output.close() sock_file.close() # Monolithic build...end of module: rocket/methods/wsgi.py

在runapp中主要执行了下面的代码:

self.environ = environ = self.build_environ(sock_file, conn)
output = self.app(environ, self.start_response)

第一行是读取客户端的请求,并取得所有的请求头。

第二行是调用请求对应的方法,并取得输出。

app是worker初始化的时候传入的。