OpenStack Trove组件WSGI和RPC调用流程

时间:2021-10-25 00:21:43

        本文是基于OpenStack Mitaka版本的Trove组件对WSGI和RPC调用流程进行分析。而OpenStack其他组件的处理流程也都大同小异,因此了解了trove的WSGI和RPC调用流程,学习OpenStack其他组件就可以举一反三了。

1. trove-api服务WSGI调用过程分析

1.1 WSGI配置文件各配置项含义

        Trove组件中的wsgi配置文件默认为/etc/trove目录下的api-paste.ini文件,在启动trove-api服务时,将加载该文件进行初始化。在该文件中,主要的配置项如下:

        composite:定义Trove服务的入口,M版本中主要包含获取版本信息服务入口与troveapi v1.0服务入口。use则指定了根据请求路径获取不同版本app的规则。

        filter:定义了Trove API中使用的过滤器。如authorization过滤器主要用来验证tenant_id等。

        app:定义了Trove API应用程序。Trove应用程序主要是versions和troveapp,url为/的请求将映射到该应用程序,而troveapp则会作为管道troveapi的应用程序。

        pipeline:定义了Trove API应用程序管道,M版本主要是troveapi,包含了多个filter和一个app。url为/v1.0的请求将映射到该管道进行处理。

        另外,在OpenStack中有三种wsgi配置方法(config/egg/call),trove-api初始化时加载api-paste.ini文件使用的是config模式,而composite中匹配版本则使用的call模式。

1.2 trove-api中处理WSGI请求的主要类与方法

        在trove-api处理请求的整个流程中,相关的类与方法主要由以下几个:

        1) versioned_urlmap()方法:该方法位于trove/common/wsgi.py中,主要用于将请求的url路径与composite中定义的版本进行匹配,而该方法中则主要返回了用于实现匹配算法的VersionedURLMap类的可调用对象。当trove接收到请求后,会调用该对象的__call__()方法匹配路径从而确定使用哪个app或pipeline。

        2) API类:该类的主要作用是添加URL映射,当接收请求时解析路径,路径匹配成功之后,调用相关方法执行相关操作并响应,API继承了wsgi.py中的Router类。

        3) Router类:初始化成员变量_router,该成员变量是routes.middleware.RoutesMiddleware类的实例化对象。该对象为可调用对象,在接收到请求时,首先通过mapper对象进行解析,然后将解析结果传递给Router中实现的_dispatch()方法对请求进行分发。

        4) Resource类:在API中添加URL映射时,传入的controller参数一般都为Resource类的实例。该类的主要作用主要有两点:首先,实现了对象的序列化与反序列化;然后,在Router对象分发之后,会进一步通过Resource类中的dispatch()方法得到具体的操作方法。

        5) Controller类:在API通过mapper解析到对应路径之后,则会调用具体的Controller对象中具体方法处理请求,并返回响应信息。

        6) Result类:主要封装了对请求的响应结果,包括状态码、状态信息以及响应消息体等。

1.3 trove WSGI具体调用流程

1.3.1 启动trove-api服务

        启动trove-api服务时,会调用trove/cmd/api.py中的main()方法对trove-api服务进行初始化。在该方法中,首先会通过CONF获取WSGI的配置文件,并获取服务限制的线程数。

conf_file = CONF.find_file(CONF.api_paste_config)workers = CONF.trove_api_workers or processutils.get_worker_count()

        获取到配置文件以后,则会调用wsgi.py中的launch()方法通过配置文件初始化WSGI配置。
launcher = wsgi.launch('trove', CONF.bind_port, conf_file,
host=CONF.bind_host,
workers=workers)
        launch()方法参数的主要含义如下:app_name为应用程序名称,在读取WSGI配置文件时,首先会找到与该参数对应的composite或app,Trove中为'trove';port为服务的端口号,Trove默认端口号为8779;conf_file为WSGI的配置文件,Trove默认为'api-paste.ini';host为WSGI服务端主机,默认为'0.0.0.0';workers为服务的线程数。
        该方法最终会调用paste插件中的loadapp()方法通过读取配置文件中的配置项初始化trove-api服务。
return deploy.loadapp("config:%s" % paste_config_file, name=app_name)
        在调用loadapp()方法时,paste插件会根据composite->pipeline->app->fileter的顺序调用配置文件中各插件对应的factory()方法。
def app_factory(global_conf, **local_conf):
return API()
        这些factory()方法则主要是初始化一些可调用对象。如trove-api中的API类的对象。
class API(wsgi.Router):
"""Defines the API routes."""
def __init__(self):
mapper = routes.Mapper()
super(API, self).__init__(mapper)
self._instance_router(mapper)
self._cluster_router(mapper)
self._datastore_router(mapper)
self._flavor_router(mapper)
self._versions_router(mapper)
self._limits_router(mapper)
self._backups_router(mapper)
self._configurations_router(mapper)
self._modules_router(mapper)
        初始化这些对象时,则主要是将trove-api支持的所有操作的URL添加到mapper映射中。如将创建、删除数据库实例等操作添加到mapper映射中。
def _instance_router(self, mapper):
instance_resource = InstanceController.create_resource()
mapper.connect("/{tenant_id}/instances,
controller=instance_resource,
action="index",
conditions={'method': ['GET']})
mapper.connect("/{tenant_id}/instances",
controller=instance_resource,
action="create",
conditions={'method': ['POST']})
        mapper为routes中的Mapper对象,在初始化时主要调用mapper对象的connect方法添加URL映射。该方法的参数主要由:第一个参数为请求的URL路径信息;controller为处理请求对应的Controller类,而在Trove中则传入的是Resource类,该类主要包含了具体操作的Controller对象,序列化对象和反序列化对象;action为Controller中进行操作的具体方法名;conditions中则定义了请求的方法GET、POST、PUT、PATCH等。
        初始化完成之后,会初始化WSGI对应的Service并启动服务。
server = base_wsgi.Service(app, port, host=host,
backlog=backlog, threads=threads)
return service.launch(CONF, server, workers)

1.3.2 接收请求并处理

        启动trove-api服务之后,trove就可以接收相关请求了。当使用REST API或命令行客户端发送请求时,首先会根据IP地址和port端口号将请求发送到trove服务中进行进一步解析。
        Trove接收到请求后,首先会调用VersionURLMap对象的__call__()方法,根据请求的路径信息匹配对应的版本,匹配成功之后将请求分发到相应的管道pipeline或应用程序app进行处理。在M版本的Trove中,如果请求前缀为/v1.0,则会将请求分发到troveapi管道中进行处理。
class VersionedURLMap(object):

def __call__(self, environ, start_response):
req = Request(environ)
if req.url_version is None and req.accept_version is not None:
version = "/v" + req.accept_version
http_exc = webob.exc.HTTPNotAcceptable(_("version not supported"))
app = self.urlmap.get(version, Fault(http_exc))
else:
app = self.urlmap
return app(version, start_response)
        在troveapi管道中,Trove组件会按照WSGI配置文件配置的过滤器对请求进行验证和过滤等操作。如authorization会验证请求的tenant_id,获取相关用户权限等。如果请求在过滤器中没有通过,则直接返回失败信息。
        如果请求通过了所有的过滤器验证后,则会调用相应的应用程序中返回的可调用对象的__call__()方法进行操作。
        首先,服务端会调用Router中的__call__()方法,而该方法返回的是Router中的成员变量_router,该变量是routes.middleware.RoutesMiddleware类的可调用对象。所以会调用其中的__call__()方法,而该方法则会通过调用Router类中的_dispatch()方法解析URL并将URL与mapper中的映射进行匹配。
class Resource(object):
@webob.dec.wsgify(RequestClass=Request)
def __call__(self, request):
"""WSGI method that controls (de)serialization and method dispatch."""

try:
action, action_args, accept = self.deserialize_request(request)
except base_exception.InvalidContentType:
msg = _("Unsupported Content-Type")
return webob.exc.HTTPUnsupportedMediaType(explanation=msg)
except base_exception.MalformedRequestBody:
msg = _("Malformed request body")
return webob.exc.HTTPBadRequest(explanation=msg)

action_result = self.execute_action(action, request, **action_args)
try:
return self.serialize_response(action, action_result, accept)
# return unserializable result (typically a webob exc)
except Exception:
return action_result
在该方法中,则会根据请求信息获取请求需要的Controller类的具体方法。如发送的请求为创建数据库实例。则会找到InstanceController类中的create()方法进行创建数据库实例的具体操作。
class InstanceController(wsgi.Controller):
def create(self, req, body, tenant_id):
# TODO(hub-cap): turn this into middleware
LOG.info(_LI("Creating a database instance for tenant '%s'"),
tenant_id)
LOG.debug("req : '%s'\n\n", strutils.mask_password(req))
LOG.debug("body : '%s'\n\n", strutils.mask_password(body))
context = req.environ[wsgi.CONTEXT_KEY]
policy.authorize_on_tenant(context, 'instance:create')
context.notification = notification.DBaaSInstanceCreate(context,
request=req)
datastore_args = body['instance'].get('datastore', {})
datastore, datastore_version = (
datastore_models.get_datastore_version(**datastore_args))
image_id = datastore_version.image_id
name = body['instance']['name']
flavor_ref = body['instance']['flavorRef']
flavor_id = utils.get_id_from_href(flavor_ref)

configuration = self._configuration_parse(context, body)
databases = populate_validated_databases(
body['instance'].get('databases', []))
database_names = [database.get('_name', '') for database in databases]
users = None
try:
users = populate_users(body['instance'].get('users', []),
database_names)
except ValueError as ve:
raise exception.BadRequest(msg=ve)

modules = body['instance'].get('modules')

# The following operations have their own API calls.
# We need to make sure the same policies are enforced when
# creating an instance.
# i.e. if attaching configuration group to an existing instance is not
# allowed, it should not be possible to create a new instance with the
# group attached either
if configuration:
policy.authorize_on_tenant(context, 'instance:update')
if modules:
policy.authorize_on_tenant(context, 'instance:module_apply')
if users:
policy.authorize_on_tenant(
context, 'instance:extension:user:create')
if databases:
policy.authorize_on_tenant(
context, 'instance:extension:database:create')

if 'volume' in body['instance']:
volume_info = body['instance']['volume']
volume_size = int(volume_info['size'])
volume_type = volume_info.get('type')
else:
volume_size = None
volume_type = None

if 'restorePoint' in body['instance']:
backupRef = body['instance']['restorePoint']['backupRef']
backup_id = utils.get_id_from_href(backupRef)
else:
backup_id = None

availability_zone = body['instance'].get('availability_zone')
nics = body['instance'].get('nics')

slave_of_id = body['instance'].get('replica_of',
# also check for older name
body['instance'].get('slave_of'))
replica_count = body['instance'].get('replica_count')
locality = body['instance'].get('locality')
if locality:
locality_domain = ['affinity', 'anti-affinity']
locality_domain_msg = ("Invalid locality '%s'. "
"Must be one of ['%s']" %
(locality,
"', '".join(locality_domain)))
if locality not in locality_domain:
raise exception.BadRequest(msg=locality_domain_msg)
if slave_of_id:
dupe_locality_msg = (
'Cannot specify locality when adding replicas to existing '
'master.')
raise exception.BadRequest(msg=dupe_locality_msg)
region_name = body['instance'].get('region_name', CONF.os_region_name)

instance = models.Instance.create(context, name, flavor_id,
image_id, databases, users,
datastore, datastore_version,
volume_size, backup_id,
availability_zone, nics,
configuration, slave_of_id,
replica_count=replica_count,
volume_type=volume_type,
modules=modules,
locality=locality,
region_name=region_name)

view = views.InstanceDetailView(instance, req=req)
return wsgi.Result(view.data(), 200)

1.3.3 处理请求后返回结果

        在调用COntroller类中具体操作方法完成处理操作之后,则会返回响应结果。
        有返回结果:
view = views.InstanceDetailView(instance, req=req)
return wsgi.Result(view.data(), 200)
        无返回结果:
return wsgi.Result(None, 202)
        对于有返回结果的操作,Controller的方法在处理结束之后,会将获取到的数据封装为view层中的对象并传入Result类中进行返回;而对于没有返回结果的操作,处理结束之后,则会直接返回Result对象,而其中的body则为空。

2. trove-taskmanager服务RPC调用流程

2.1 OpenStack处理消息队列的oslo-messaging依赖包

2.1.1 与RabbitMQ调用相关的类和方法

        Trove在M版本中使用OpenStack提供的oslo-messaging依赖包进行RPC调用,在oslo-messaging中与RPC调用相关的类和方法主要由以下几个:

        1) Target类:该类主要用于描述客户端发送的消息去哪儿和服务端接收什么消息。其主要成员变量有:

                exchange:将接收到的消息分类,并告知消息分发到何种路由何种队列。

                topic:是RPC消息的唯一标识,客户端发送topic的消息,服务端则接收处理对应topic的消息。

                namespace:服务端可以在一个topic上提供多个方法集合,这个集合通过namespace分开管理。

                fanout:如果为True,则将消息发送到所有满足条件的server上,此时会忽略topic指定的内容。

                server:服务端标识。

                version:标识rpc api的版本。

        2) Transport类:实现监听和发送消息的抽象层,具体实现则是由Transport成员变量_driver来定义的。

        3) RabbitDriver类:具体实现消息的监听,发送等操作,在OpenStack Trove组件中使用的是RabbitMQ消息队列,因此在调用时会匹配到RabbitDriver类。

        4) MassageHandlingServer类:监听消息的服务端。将一个Transport和一个PRCDispatcher联系起来,用于分发和处理消息。

        5) RPCDispatcher类:定义了具体的消息分发机制。

        6) RPCClient类:消息客户端,利用一个可调用_CallContext对象发送消息。

2.1.2 OpenStack RPC处理流程

        在OpenStack使用RPC传输和处理消息时,首先会根据消息接收端的Target和endpoints初始化一个RPCDispatcher,然后会根据具体的Transport和定义的RPCDispatcher创建一个MessageHandlingServer接收并处理消息。

        当实例化一个RPCClient之后,客户端发送cast或call消息,最终会调用Transport中定义的RabbitDriver对应的send()方法发送消息。

        当Server端接收到消息后,则会调用RPCDispatcher对象中的dispatch()方法定义的规则找到具体的Manager处理消息中对应的操作。

        如果是call调用,则会监听处理方法时候执行完成,当执行完成之后则返回相应的值。

2.2 Trove RPC处理流程

2.2.1 启动trove-taskmanager服务

        当启动trove-taskmanager服务时,Trove会调用trove/cmd/taskmanager.py中的main()方法完成服务的初始化操作并启动服务。

@with_initialize(extra_opts=extra_opts)
def main(conf):
startup(conf, conf.taskmanager_queue)
        在调用main()方法之前,会首先调用@with_initialize装饰器,在其中会调用rpc.init()方法初始化RPC调用。而init()方法中主要是获取了一个服务端的Transport,该Transport对象中设置了实际处理消息的驱动Driver,Trove默认使用RabbitMQ进行消息传输,所以初始化时设置的为RabbitDriver。

TRANSPORT = messaging.get_transport(conf,
allowed_remote_exmods=exmods,
aliases=TRANSPORT_ALIASES)
        得到Transport之后,则会进入main()方法,该方法中则调用了startup()方法进行具体的操作。在startup()方法中,首先初始化了一个用于接收消息的服务端RPCService。
server = rpc_service.RpcService(
manager=conf.taskmanager_manager, topic=topic,
rpc_api_version=rpc_version.RPC_API_VERSION)
        传入的参数主要有:manager指定了处理具体操作的Manager类,在trove-taskmanager中为trove.taskmanager.manager.Manager;topic是服务端接收消息的唯一标识,相当于RabbitMQ中的binding key,exchange也是根据topic来进行消息分发的,taskmanager中默认使用的topic为taskmanager;rpc_api_version则表示了rpc api的版本,主要在升级时使用,M版本的Trove使用的是1.0版本的RPC调用。
        初始化Service完成之后,则会调用Service的launch()方法启动服务,启动服务时,会调用RPCService中的start()方法启动trove-taskmanager的RPC服务。在start()方法中,最终会调用oslo-messagine的get_rpc_server()方法获取一个用于接收taskmanager消息的服务。

def get_rpc_server(transport, target, endpoints,
executor='blocking', serializer=None):
dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
return msg_server.MessageHandlingServer(transport, dispatcher, executor)
        该方法主要的入参有:transport用于获取处理RabbitMQ消息的RabbitDriver驱动;target用于指定该服务接收什么消息;endpoints则指定了处理消息的RPCDispatcher;然后根据transport、dispatcher和executor创建并返回一个用于接收消息的MessageHandlingServer。创建完成之后,会调用该Server的start()方法启动该Server。

2.2.2 trove-taskmanager发送消息流程

        一般地,Trove首先会调用taskmanager的api中的方法发送rpc消息。所以,在调用taskmanager的API之前,会先调用其初始化方法。

class API(object):
def __init__(self, context):
self.context = context
super(API, self).__init__()

version_cap = self.VERSION_ALIASES.get(
CONF.upgrade_levels.taskmanager, CONF.upgrade_levels.taskmanager)
target = messaging.Target(topic=CONF.taskmanager_queue,
version=version_cap)

self.client = self.get_client(target, version_cap)
        在调用__init__()方法时,首先创建了一个target,该target指定了消息将发送到哪儿。其中,topic相当于RabbitMQ中的routing key,需要与Server中的topic匹配,不一定需要完全匹配,在trove-taskmanager中,API中的topic与Server中的topic都为taskmanager。

        得到target之后,会根据该target的对应的版本创建一个RPC客户端。最终会调用trove/rpc.py中的get_client()方法得到一个RPCClient对象。

        初始化API得到对应的对象之后,就可以调用具体方法发送消息。这些方法最终都会调用API中的_cast()或_call()方法发送消息。在trove-taskmanager中通常只需要调用_cast()方法。

def _cast(self, method_name, version, **kwargs):
LOG.debug("Casting %s" % method_name)
with NotificationCastWrapper(self.context, 'taskmanager'):
cctxt = self.client.prepare(version=version)
cctxt.cast(self.context, method_name, **kwargs)
        该方法中,首先调用RPCClient中的prepare()方法获取一个_CallContext对象,然后调用该对象的cast()方法发送消息。该方法的参数主要包含了context表示上下文信息;method_name则表示了当接收到消息后需要调用的处理方法;kwargs中则包含了调用处理方法时所需的参数等。

2.2.3 trove-taskmanager接收并处理消息流程

        trove-taskmanager发送消息成功之后,OpenStack的oslo-messaging会负责接收消息,并进行分发处理,此时会调用RPCDispatcher中的dispatch()方法得到Manager对应的处理方法,之后则会调用trove-taskmanager中的Manager的对应方法进行具体处理。         如果为_call()请求,Manager在处理完之后,会返回相应的处理结果。如果等待超时,则会记录错误日志。