Openstack liberty 中Cinder-api启动过程源码分析2

时间:2023-02-01 02:13:50

在前一篇博文中,我分析了cinder-api启动过程中WSGI应用的加载及路由的建立过程,本文将分析WSGI服务器的启动及其接收客户端请求并完成方法映射的过程,请看下文的分析内容:

创建WSGI服务器

在初始化WSGIService对象的过程中,会创建WSGI服务器,如下:

#`cinder.service:WSGIService.__init__`
def __init__(self, name, loader=None):
""" name = `osapi_volume`

省略与加载`WSGI`应用及建立路由映射的代码逻辑
"""


......

"""创建`WSGI`服务器对象,返回:
`cinder.wsgi.eventlet_server:Server`对象

name = ``osapi_volume
app = URLMap对象,在加载`WSGI`应用及建立路由映射时返回
host & port 服务的监听地址

构造过程很简单:根据输入参数及`cinder.conf`文件设置参数,
创建工作线程池(`GreenPool`),创建监听`socket`
"""

self.server = wsgi.Server(name,
self.app,
host=self.host,
port=self.port)

启动WSGI服务器

创建WSGIService服务对象成功,后面就是启动服务,如下:

#`cinder.cmd.api:main`
def main():
#省略其他初始化代码
.......

launcher = service.process_launcher()
server = service.WSGIService('osapi_volume')

#通过进程启动器 - 在新的进程中启动服务程序,并等待服务启动结束
#最终会调用`WSGIService.start`方法启动服务
launcher.launch_service(server, workers=server.workers)
launcher.wait()

#接上文:`WSGIService.start`
def start(self):
#manager = None
if self.manager:
self.manager.init_host()

#启动`WSGI`服务器:`cinder.wsgi.eventlet_server:Server.start`
#请看下的具体分析
self.server.start()
self.port = self.server.port

#接上文:`cinder.wsgi.eventlet_server:Server.start`
def start(self):
"""Start serving a WSGI application.

该函数大部分的代码都是在设置`socket`属性,如:重用,keepalive等,
之后通过eventlet开启一个线程来启动服务
"""


#省略与设置`socket`属性相关的代码
......


"""启动一个线程来启动服务,线程函数为:eventlet.wsgi.server
各启动参数在初始化后服务器对象是设置,值如下:

'site' = 在加载`WSGI`应用及建立路由映射时返回的URLMap对象
'protocol' = 所使用的协议, eventlet.wsgi.HttpProtocol
'custom_pool' = 工作线程池,GreenPool
"""

wsgi_kwargs = {
'func': eventlet.wsgi.server,
'sock': dup_socket,
'site': self.app,
'protocol': self._protocol,
'custom_pool': self._pool,
'log': self._logger,
'socket_timeout': self.client_socket_timeout,
'keepalive': CONF.wsgi_keep_alive
}

self._server = eventlet.spawn(**wsgi_kwargs)

下面来看线程函数../sit-packages/eventlet.wsgi.server的处理过程:

def server(sock, site,
log=None,
environ=None,
max_size=None,
max_http_version=DEFAULT_MAX_HTTP_VERSION,
protocol=HttpProtocol,
server_event=None,
minimum_chunk_size=None,
log_x_forwarded_for=True,
custom_pool=None,
keepalive=True,
log_output=True,
log_format=DEFAULT_LOG_FORMAT,
url_length_limit=MAX_REQUEST_LINE,
debug=True,
socket_timeout=None,
capitalize_response_headers=True)
:


"""Start up a WSGI server handling requests from the
supplied server socket. This function loops forever.
The *sock* object will be closed after server exits, but
the underlying file descriptor will remain open, so if
you have a dup() of *sock*, it will remain usable.


该函数包含众多参数,初始了下述的参数由调用者传入外,其他的都是默认值:
'sock': dup_socket,
'site': self.app,
'protocol': self._protocol,
'custom_pool': self._pool,
'log': self._logger,
'socket_timeout': self.client_socket_timeout,
'keepalive': CONF.wsgi_keep_alive

整个函数的逻辑比较简单,首先创建一个服务器对象,然后基于输入参数设置属
性,之后在一个大循环中监听来自客户端的连接
"""

#首先创建一个服务器对象(`eventlet.wsgi.Server`),
#用于处理来自客户端的请求,构造过程很简单--直接通过参数赋值内部成员变量
serv = Server(sock, sock.getsockname(),
site, log,
environ=environ,
max_http_version=max_http_version,
protocol=protocol,
minimum_chunk_size=minimum_chunk_size,
log_x_forwarded_for=log_x_forwarded_for,
keepalive=keepalive,
log_output=log_output,
log_format=log_format,
url_length_limit=url_length_limit,
debug=debug,
socket_timeout=socket_timeout,
capitalize_response_headers = \
capitalize_response_headers,
)

#设置工作线程池(逻辑很简单,省略)
.......

#大循环,监听来自客户端的请求(默认监听在8776端口)
while is_accepting:
#这里省略try{}except异常代码,如果发生错误,就抛异常;
#在异常退出前,会关闭监听`socket`

#接收客户端连接,并设置超时时间(默认900s)
client_socket = sock.accept()
client_socket[0].settimeout(serv.socket_timeout)

#从工作线程池中取一个线程来处理客户端请求,线程函数为:
#`serv.process_request`
pool.spawn_n(serv.process_request, client_socket)

#异常处理
.......

这样,cinder-api中的socket监听就准备就绪了:基于配置cinder-api会启动多个监听线程,每个客户端连接用来一个线程来处理;下文通过cinder list命令,来分析cinder-api是如何接收及处理客户端请求的

处理客户端请求

下文通过cinder list命令,来分析cinder-api是如何接收及处理客户端请求的:基于上文的分析,我们知道接收到客户端的请求后,cinder-api从工作线程池中取一个线程来处客户端的请求,线程函数如下:

#`../site-packages/eventlet/wsgi:Server.process_request`
def process_request(self, sock_params):
""" The actual request handling takes place in __init__, so
we need to set minimum_chunk_size before __init__ executes
and we don't want to modify class variable
"""

#sock_params是连接`socket`(GreenSocket对象)
sock, address = sock_params
#创建协议对象,这里是`HttpProtocol`
proto = new(self.protocol)
if self.minimum_chunk_size is not None:
proto.minimum_chunk_size = self.minimum_chunk_size
proto.capitalize_response_headers =
self.capitalize_response_headers
try:
"""开始处理客户端请求,如果发生异常,就关闭socket
类继承关系如下:HttpProtocol -> BaseHTTPRequestHandler ->
StreamRequestHandler -> BaseRequestHandler

前面几个对象(类)都没有显示定义`__init__`方法,所以最终调用的是
基对象(类)`BaseRequestHandler.__ini__`的方法,请看下文
"""

proto.__init__(sock, address, self)
except socket.timeout:
# Expected exceptions are not exceptional
sock.close()
# similar to logging "accepted" in server()
self.log.debug('(%s) timed out %r' % (self.pid,
address))

#接上文:`SocketServer:BaseRequestHandler.__ini__`
def __init__(self, request, client_address, server):
"""变量赋值
request GreenSocket 对象,表示客户端的当前连接
client_address 客户端连接('ip','port')
server eventlet.wsgi.Server对象
"""

self.request = request
self.client_address = client_address
self.server = server

#调用`HttpProtocol.setup`方法,封装socket分别用于读写
self.setup()

try:
#调用`BaseHTTPRequestHandler.handle`处理请求
#内部直接调用`HttpProtocol.handle_one_request`方法
self.handle()
finally:
#刷新写缓冲,最后关闭socket连接
self.finish()

下面一起来看HttpProtocol.handle_one_request方法的处理过程:

#`eventlet.wsgi:HttpProtocol.handle_one_request`
def handle_one_request(self):
"""该方法主要是读取客户端请求数据"""

#更新协议版本,如:'HTTP/1.1'
if self.server.max_http_version:
self.protocol_version = self.server.max_http_version
#如果读端已经关闭,就直接返回
if self.rfile.closed:
self.close_connection = 1
return
#这里省略try{}except异常处理代码块

#1.读取客户端请求

"""读取用户请求,`cinder list`对应的请求行是:'GET
/v2/25520b29dce346d38bc4b055c5ffbfcb/volumes/detail
HTTP/1.1'
"""

self.raw_requestline =
self.rfile.readline(self.server.url_length_limit)
#如果请求行超过了允许的最大值,就指示关闭连接并结束请求处理过程
if len(self.raw_requestline) ==
self.server.url_length_limit:
#回复错误信息给客户端
self.wfile.write(
b"HTTP/1.0 414 Request URI Too Long\r\n"
b"Connection: close\r\nContent-length:
0\r\n\r\n"
)
self.close_connection = 1
return

#请求行为空,指示关闭连接并结束请求处理过程
if not self.raw_requestline:
self.close_connection = 1
return

#这里省略try{}except异常处理代码块,如果发生异常,回复错误信息给客户
#端,指示关闭连接并结束请求处理过程,还原socket读端

#2.解析客户端请求

#暂存原来的socket读端,为读取请求头封装一个新的读端
orig_rfile = self.rfile
self.rfile = FileObjectForHeaders(self.rfile)
"""解析请求行,失败就返回结束请求处理过程,解析出:
命令:'GET
路径:/v2/25520b29dce346d38bc4b055c5ffbfcb/volumes/detail
版本:HTTP/1.1'
"""

if not self.parse_request():
return

#检查`content-length`是否合法,回复错误信息给客户
#端,指示关闭连接并结束请求处理过程
content_length = self.headers.get('content-length')
if content_length:
try:
int(content_length)
except ValueError:
self.wfile.write(
b"HTTP/1.0 400 Bad Request\r\n"
b"Connection: close\r\nContent-length:
0\r\n\r\n"
)
self.close_connection = 1
return

#这里省略try{}except异常处理代码块

#3.处理请求

#获得请求环境信息:包含与该请求相关的所有信息,包括:请求命令,请求路
#径,请求协议版本,客户端地址等
self.environ = self.get_environ()
#app = 加载`WSGI`应用过程中返回的URLMap对象
self.application = self.server.app

#开始处理请求前,未完成请求数加1
self.server.outstanding_requests += 1

#处理请求
self.handle_one_response()

#完成处理请求后,未完成请求数减1
self.server.outstanding_requests -=1

下面来看cinder-api是如何将客户端请求映射到具体的处理方法的:

#`eventlet.wsgi:HttpProtocol.handle_one_response`
def handle_one_response(self):
"""该方法代码很多,下文分析过程中省略非关键代码:变量初始化,异常代码
,内部函数等"""


""""`start_response`是`handle_one_response`定义的一个内部方
法,self.application = paste.urlmap.URLMap, 在加载`WSGI`应用过
程中返回,它是一个`callable`对象,所以这里实际上执行的是它的
__call__方法,请求处理完后,最终会回调`start_response`方法,在该方
法中:如果处理失败有异常,就抛异常;如果请求处理成功,就将响应头中各字
段的名字的第一个字符大写,最后将另外一个内部方法`write`返回给调用者

在我们的`cinder list`例子中,如果处理成功`result`包含的是一个
包含volume信息的list

下面先来看看`paste.urlmap.URLMap.__call__`方法的实现"""

result = self.application(self.environ, start_response)

"""下面的代码就是对请求结果的处理, 包含大量的条件及异常处理,这里就不
给出了,刚兴趣的读者可以自行阅读源码, 主要是:

通过内部方法write发送应答给客户端(每次minimum_write_chunk_size =
4096)以及记录日志
"""

......



#接上文:`paste.urlmap.URLMap.__call__`
def __call__(self, environ, start_response):
"""先解析参数
host & port 分别表示客户端的请求ip及port
path_info 该次请求的路径:
'/v2/25520b29dce346d38bc4b055c5ffbfcb/volumes/detail'
"""

host = environ.get('HTTP_HOST',
environ.get('SERVER_NAME')).lower()
if ':' in host:
host, port = host.split(':', 1)
else:
if environ['wsgi.url_scheme'] == 'http':
port = '80'
else:
port = '443'
path_info = environ.get('PATH_INFO')
path_info = self.normalize_url(path_info, False)[1]

#`还记得加载`WSGI`应用`osapi_volume`时加载的三个程序吧!!!
#<path, app>映射就保存在URLMap.applications中,下面通过
#路径找到具体的应用
for (domain, app_url), app in self.applications:
if domain and domain != host and domain !=
host+':'+port:
continue
#我们这里找到的是`/v2`指向的应用
if (path_info == app_url
or path_info.startswith(app_url + '/')):
environ['SCRIPT_NAME'] += app_url
environ['PATH_INFO'] = path_info[len(app_url):]
#`app`是一个经过各个过滤器层层包装的`apiv2`应用
#如果不理解,就回过去看上一篇博文吧!!!
#所有的过滤器都是`callable`对象,所以调用的都是它们的
#`__call__`方法, 在下文的分析中,我直接跳过过滤器的分析,
#直接来看`apiv2`的处理过程
return app(environ, start_response)
#如果没有找到特定的应用,就通过`not_found_application`处理该请求
#`not_found_application`是在加载`osapi_volume`过程中加载的,
#但是`osapi_volume`并没有定义`not_found_application`应用。所以
#调用的是`URLMap.not_found_application`方法,向客户端返回`http
#not found`异常信息
environ['paste.urlmap_object'] = self
return self.not_found_application(environ, start_response)

接着上文的分析:根据前一篇博文的分析,我们知道加载apiv2应用时,实例化的是APIRouter对象, 它的继承关系如下:
api.v2.router.APIRouter ->
api.openstack.__ini__.APIRouter ->
cinder.wsgi.common.Router, 它们也实现了__call__方法,所以也是
callable对象, 如下:

def __call__(self, req):
"""Route the incoming request to a controller based on
self.map.

If no match, return a 404.
"
""

#这是一个`RoutesMiddleware`实例,实际上也是一个`callable`对象
#`cinder-api`就是通过该对象来存储及搜索<path, action>的
return self._router

#接上文:`routes.middleware.RoutesMiddleware.__call__`
def __call__(self, environ, start_response):
"""参数environ及start_response就是最开始传入的那两个,只不过
environ经过了其他过滤器的二次处理"
""

#方法的前半部分代码,都是对输入参数进入校验判断,这里就不再给出了

#singleon在初始化的时候设置为True
#根据请求路径查找应用程序!!!
if self.singleton:
#创建一个`_RequestConfig`对象
config = request_config()
#mapper就是`ProjectMapper`对象,包含了路由映射信息
config.mapper = self.mapper
"""在设置该值的时候,会根据`environ`中的请求路径调用
`self.mapper.routematch`方法完成路径映射,之后会设置
config.mapper_dict及config.route的值,我们的`cinder list`
例子中:
config.mapper_dict = {'action': u'detail',
'controller': cinder.api.openstack.wsgi.Resource,
'project_id': u'25520b29dce346d38bc4b055c5ffbfcb'}
config.route = routes.route.Route object

有没有觉得`config.mapper_dict`的值跟在
`APIRouter._setup_routes`中设置`detail`路由时类似呢!!!,
不记得的读者可以实现代码
"
""
config.environ = environ
match = config.mapper_dict
route = config.route
else:
results = self.mapper.routematch(environ=environ)
if results:
match, route = results[0], results[1]
else:
match = route = None

if old_method:
environ['REQUEST_METHOD'] = old_method

......

#更新请求环境信息environ
url = URLGenerator(self.mapper, environ)
environ['wsgiorg.routing_args'] = ((url), match)
environ['routes.route'] = route
environ['routes.url'] = url

#处理定向路由
if route and route.redirect:
route_name = '_redirect_%s' % id(route)
location = url(route_name, **match)
log.debug("Using redirect route, redirect to '%s' with"
"status code: %s", location, route.redirect_status)
start_response(route.redirect_status,
[('Content-Type', 'text/plain;
charset=utf8'
),('Location', location)])
return []

# If the route included a path_info attribute and it should
#be used to alter the environ, we'll pull it out
#更新路径
if self.path_info and 'path_info' in match:
oldpath = environ['PATH_INFO']
newpath = match.get('path_info') or ''
environ['PATH_INFO'] = newpath
if not environ['PATH_INFO'].startswith('/'):
environ['PATH_INFO'] = '/' + environ['PATH_INFO']
environ['SCRIPT_NAME'] += re.sub(
r'^(.*?)/' + re.escape(newpath) + '$', r'\1', oldpath)

#app在初始化`RoutesMiddleware`是设置,是`Router._dispatch`方法
response = self.app(environ, start_response)

#接上文:`Router._dispatch`
def _dispatch(req):
"""Dispatch the request to the appropriate controller.
Called by self._router after matching the incoming request
to a route and putting the information into req.environ.
Either returns 404 or the routed WSGI app's response.
"
""
#从请求环境信息中获得处理该请求的`controller`
match = req.environ['wsgiorg.routing_args'][1]
if not match:
return webob.exc.HTTPNotFound()
#`cinder.api.openstack.wsgi.Resource` 内部控制器是:
#cinder.api.v2.volumes.VolumeController, 记得主机的是:
#`Resource`也是`callable`对象
app = match['controller']
return app

#接上文:`cinder.api.openstack.wsgi.Resource.__call__`
def __call__(self, request):
# Identify the action, its arguments, and the requested
# content type
#获得请求action及参数:{'action': u'detail', 'project_id':
# u'25520b29dce346d38bc4b055c5ffbfcb'}
action_args = self.get_action_args(request.environ)
action = action_args.pop('action', None)
#获取请求体:None, ''
content_type, body = self.get_body(request)
#设置应答请求类型:'application/json'
accept = request.best_match_content_type()

#得到请求对应的处理方法
return self._process_stack(request, action, action_args,
content_type, body, accept)
def _process_stack(self, request, action, action_args,
content_type, body, accept)
:

"""Implement the processing stack."""

"""省略异常处理,获取实现方法,最终得到的结果是这样的:
meth = <bound method VolumeController.detail of
<cinder.api.v2.volumes.VolumeController object at
0x3567b10>>

extensions = [<bound method
VolumeTenantAttributeController.detail of
<cinder.api.contrib.volume_tenant_attribute.VolumeTenantAtt
ributeController object at 0x42b63d0>>, <bound method
VolumeReplicationController.detail of
<cinder.api.contrib.volume_replication.VolumeReplicationCon
troller object at 0x42be850>>, <bound method
VolumeHostAttributeController.detail of
<cinder.api.contrib.volume_host_attribute.VolumeHostAttribu
teController object at 0x454a290>>, <bound method
VolumeMigStatusAttributeController.detail of
<cinder.api.contrib.volume_mig_status_attribute.VolumeMigSt
atusAttributeController object at 0x454a1d0>>, <bound
method VolumeImageMetadataController.detail of
<cinder.api.contrib.volume_image_metadata.VolumeImageMetada
taController object at 0x454ad10>>]

到这里就我们就知道是哪个方法来处理`cinder list`请求了,
关于如何找到实现方法的, 请看下文的对get_method的分析
"""

meth, extensions = self.get_method(request, action,
content_type, body)

#省略异常处理,反序列化请求体, 这里content_type = None
if content_type:
contents = self.deserialize(meth, content_type, body)
else:
contents = {}

# Update the action args
action_args.update(contents)

#如果请求的项目id不一致则抛异常
project_id = action_args.pop("project_id", None)
context = request.environ.get('cinder.context')
if (context and project_id and (project_id !=
context.project_id)):
msg = _("Malformed request url")
return Fault(webob.exc.HTTPBadRequest(explanation=msg))

# Run pre-processing extensions
#通过`extensions`预处理请求,
#如果一切正常,reponse = None, `post`是逆序`extensions`列表
#请求处理完后,会以`post`为参数调用post_process_extensions方法
response, post = self.pre_process_extensions(extensions,
request,
action_args)
#如果没有异常,就处理请求
if not response:
try:
with ResourceExceptionHandler():
#这里调用meth方法处理请求,也就是:
#cinder.api.v2.volumes.VolumeController.detail
#该方法的具体处理过程,这里就不介绍了,有兴趣的读者可以
#自行阅读源码,如果一切正确,这里讲返回当前的volume列表
action_result = self.dispatch(meth, request,
action_args)
except Fault as ex:
response = ex

#处理应答包
if not response:
# No exceptions; convert action_result into a
# ResponseObject
resp_obj = None
if type(action_result) is dict or action_result is
None:
resp_obj = ResponseObject(action_result)
elif isinstance(action_result, ResponseObject):
resp_obj = action_result
else:
response = action_result

# Run post-processing extensions
if resp_obj:
_set_request_id_header(request, resp_obj)
# Do a preserialize to set up the response object
serializers = getattr(meth, 'wsgi_serializers', {})
resp_obj._bind_method_serializers(serializers)
if hasattr(meth, 'wsgi_code'):
resp_obj._default_code = meth.wsgi_code
resp_obj.preserialize(accept,
self.default_serializers)

# Process post-processing extensions
#逐一调用post中的扩展,再次处理请求,各个扩展的处理过程这里
#也不分析了,有兴趣的读者可以自行阅读源码
response = self.post_process_extensions(post,
resp_obj,
request,
action_args)
#序列化响应体
if resp_obj and not response:
response = resp_obj.serialize(request, accept,
self.default_serializers)
.......

#将响应体逐层返回, 最终会调用上文`handle_one_response` 方法中
#定义的内部方法`start_response`
return response

#接上文:查找实现方法
def get_method(self, request, action, content_type, body):
"""Look up the action-specific method and its
extensions."""


# Look up the method
#从控制器属性中获取方法
#这里是:cinder.api.v2.volumes.VolumeController.detail
try:
if not self.controller:
meth = getattr(self, action)
else:
meth = getattr(self.controller, action)
except AttributeError as e:
with excutils.save_and_reraise_exception(e) as ctxt:
if (not self.wsgi_actions or action not in
['action',
'create',
'delete',
'update']):
LOG.exception(_LE('Get method error.'))
else:
ctxt.reraise = False
else:
#如果该方法有扩展方法的话,一并返回
return meth, self.wsgi_extensions.get(action, [])

#从`资源扩展`和`控制器扩展`中获取方法
if action == 'action':
# OK, it's an action; figure out which action...
mtype = _MEDIA_TYPE_MAP.get(content_type)
action_name = self.action_peek[mtype](body)
LOG.debug("Action body: %s", body)
else:
action_name = action

# Look up the action method
return (self.wsgi_actions[action_name],
self.wsgi_action_extensions.get(action_name, []))

到这里cinder-api处理一个客户端请求的代码就分析完了,在结束前给出处理cinder list请求的流程图:

            `eventlet.wsgi.Server.process_request`
|
V
`HttpProtocol.__init__`
|
V
`HttpProtocol.handle`
|
V
`HttpProtocol.handle_one_request`
|
V
`HttpProtocol.rfile.readline`
|
V
`HttpProtocol.parse_request`
|
V
`handle_one_response()` -> `start_response` -> `write`
| ^
V |
`paste.urlmap.URLMap.__call__`
| ^
V |
`*.__call__` (`各个filter的__call__`)
| ^
V |
`cinder.wsgi.common.Router.__call__`
| ^
V |
`routes.middleware.RoutesMiddleware.__call__`
| ^
V |
`cinder.wsgi.common.Router._dispatch`
| ^
V |
`cinder.api.openstack.wsgi.Resource.__call__`
| ^
V |
`cinder.api.openstack.wsgi.Resource.process_stack`
| ^
V |
`cinder.api.v2.volumes.VolumeController.detail`

本文完!!!