Openstack如何发现计算节点

时间:2023-01-09 00:16:16

结算节点服务启动后会调用nova\cmd\compute.py里的main函数

def main():
config.parse_args(sys.argv)
logging.setup('nova')
utils.monkey_patch()
objects.register_all()

gmr.TextGuruMeditation.setup_autorun(version)

if not CONF.conductor.use_local:
block_db_access()
objects_base.NovaObject.indirection_api = \
conductor_rpcapi.ConductorAPI()
#构建服务
server = service.Service.create(binary='nova-compute',
topic=CONF.compute_topic,
db_allowed=CONF.conductor.use_local)
#启动服务
service.serve(server)
service.wait()

两步,一个是构建service,一个是启动service,实现都在nova\service.py


@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_enable=None,
periodic_fuzzy_delay=None, periodic_interval_max=None,
db_allowed=True)
:

"""Instantiates class and passes back application object.

:param host: defaults to CONF.host
:param binary: defaults to basename of executable
:param topic: defaults to bin_name - 'nova-' part
:param manager: defaults to CONF.<topic>_manager
:param report_interval: defaults to CONF.report_interval
:param periodic_enable: defaults to CONF.periodic_enable
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
:param periodic_interval_max: if set, the max time to wait between runs

"""


#这里的host为None
if not host:
#在nova\netconf.py执行socket.gethostname()
#CONF.import_opt('host', 'nova.netconf')
#host被初始化为主机名
host = CONF.host
if not binary:
binary = os.path.basename(sys.argv[0])
if not topic:
topic = binary.rpartition('nova-')[2]
if not manager:
manager_cls = ('%s_manager' %
binary.rpartition('nova-')[2])
manager = CONF.get(manager_cls, None)
if report_interval is None:
report_interval = CONF.report_interval
if periodic_enable is None:
periodic_enable = CONF.periodic_enable
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay

debugger.init()

service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_enable=periodic_enable,
periodic_fuzzy_delay=periodic_fuzzy_delay,
periodic_interval_max=periodic_interval_max,
db_allowed=db_allowed)

return service_obj
def start(self):
"""Start serving this service using loaded configuration.

Also, retrieve updated port number in case '0' was passed in, which
indicates a random port should be used.

:returns: None

"""

if self.manager:
#间接调用驱动
#对于计算节点manager初始化为nova\compute\manager.py里的ComputeManager
self.manager.init_host()
self.manager.pre_start_hook()
if self.backdoor_port is not None:
self.manager.backdoor_port = self.backdoor_port
"""最终会被run_service执行的 """
self.server.start()
if self.manager:
self.manager.post_start_hook()

构建服务时,初始化host为主机名,并在构建manager时作为参数传入,对于计算节点,在create服务时,将manager初始化为nova\compute\manager.py里的ComputeManager:

        #这里的host为None
if not host:
#在nova\netconf.py执行socket.gethostname()
#CONF.import_opt('host', 'nova.netconf')
#host被初始化为主机名
host = CONF.host
        service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_enable=periodic_enable,
periodic_fuzzy_delay=periodic_fuzzy_delay,
periodic_interval_max=periodic_interval_max,
db_allowed=db_allowed)

return service_obj

再看下ComputeManager,是继承于manager.Manage,初始化函数:

ComputeManager

class ComputeManager(manager.Manager):
"""Manages the running instances from creation to destruction."""

target = messaging.Target(version='3.35')

# How long to wait in seconds before re-issuing a shutdown
# signal to a instance during power off. The overall
# time to wait is set by CONF.shutdown_timeout.

"""Load configuration options and connect to the
hypervisor.
"""

"""nova/compute/manager.py/ComputeManager.__init__
api的创建都是根据配置(/etc/nova.conf)中指定的类名称,然后创建
对应的API类实例,具体的类请看注释; 而client rpc api则是
#创建一个RPCClient实例并与特定的Target(指定了消息的发送目的
地)及Transport(消息传输层)关联,后文以
`nova/compute/rpcapi.py/ComputeAPI`为例分析具体的实现
"""

SHUTDOWN_RETRY_INTERVAL = 10

def __init__(self, compute_driver=None, *args, **kwargs):
"""Load configuration options and connect to the hypervisor."""
self.virtapi = ComputeVirtAPI(self)
self.network_api = network.API()
self.volume_api = volume.API()
self.image_api = image.API()
self._last_host_check = 0
self._last_bw_usage_poll = 0
self._bw_usage_supported = True
self._last_bw_usage_cell_update = 0
self.compute_api = compute.API()
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
self.conductor_api = conductor.API()
self.compute_task_api = conductor.ComputeTaskAPI()
self.is_neutron_security_groups = (
openstack_driver.is_neutron_security_groups())
self.consoleauth_rpcapi = consoleauth.rpcapi.ConsoleAuthAPI()
self.cells_rpcapi = cells_rpcapi.CellsAPI()
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
self._resource_tracker_dict = {}
self.instance_events = InstanceEvents()
self._sync_power_pool = (
eventlet.GreenPool(CONF.sync_power_state_concurrency))
self._syncs_in_progress = {}

super(ComputeManager, self).__init__(service_name="compute",
*args, **kwargs)

# NOTE(russellb) Load the driver last. It may call back into the
# compute manager via the virtapi, so we want it to be fully
# initialized before that happens.
#加载驱动
#对于vmware驱动为nova\virt\vmwareapi\driver.py
self.driver = driver.load_compute_driver(self.virtapi, compute_driver)
self.use_legacy_block_device_info = \
self.driver.need_legacy_block_device_info

manager.Manage

class Manager(base.Base, periodic_task.PeriodicTasks):

def __init__(self, host=None, db_driver=None, service_name='undefined'):
if not host:
host = CONF.host
#这里传入host
self.host = host
self.backdoor_port = None
self.service_name = service_name
self.notifier = rpc.get_notifier(self.service_name, self.host)
self.additional_endpoints = []
super(Manager, self).__init__(db_driver)

也即是self.host被初始化为主机名.
在启动服务时,self.manager.pre_start_hook()会调用ComputeManager的pre_start_hook函数:


def pre_start_hook(self):
"""After the service is initialized, but before we fully bring
the service up by listening on RPC queues, make sure to update
our available resources (and indirectly our available nodes).
"""

self.update_available_resource(nova.context.get_admin_context())

#表示为定期执行
@periodic_task.periodic_task
def update_available_resource(self, context):
"""See driver.get_available_resource()

Periodic process that keeps that the compute host's understanding of
resource availability and usage in sync with the underlying hypervisor.

:param context: security context
"""

new_resource_tracker_dict = {}
nodenames = set(self.driver.get_available_nodes())
for nodename in nodenames:
rt = self._get_resource_tracker(nodename)
#上报计算节点信息
rt.update_available_resource(context)
new_resource_tracker_dict[nodename] = rt

# Delete orphan(孤儿) compute node not reported by driver but still in db
#获取数据库中的计算节点信息
compute_nodes_in_db = self._get_compute_nodes_in_db(context,
use_slave=True)

#这里会删掉非本节点虚拟机管理器
for cn in compute_nodes_in_db:

if cn.hypervisor_hostname not in nodenames:
LOG.audit(_("Deleting orphan compute node %s") % cn.id)
cn.destroy()

self._resource_tracker_dict = new_resource_tracker_dict

可以看到update_available_resource会调用update_available_resource,除了在初始化时调用一次update_available_resource,该函数还会被定时调用,因为有 @periodic_task.periodic_task.在update_available_resource函数中会调用rt.update_available_resource(实现在nova\compute\resource_tracker.py)
在分析rt.update_available_resource函数之前,我们看下这个rt是个什么东西?

rt = self._get_resource_tracker(nodename)

原来是通过调用_get_resource_tracker获得的:

    def _get_resource_tracker(self, nodename):
rt = self._resource_tracker_dict.get(nodename)
if not rt:
if not self.driver.node_is_available(nodename):
raise exception.NovaException(
_("%s is not a valid node managed by this "
"compute host.") % nodename)
#这里要传入host
rt = resource_tracker.ResourceTracker(self.host,
self.driver,
nodename)
self._resource_tracker_dict[nodename] = rt
return rt

可以看到是个ResourceTracker对象,初始化时传入了self.host和nodename,self.host也就是前面提到初始化manager时传入的主机名,nodename是通过驱动获取的虚拟机管理器(一个计算机点可以下辖多个虚拟机管理器).看下ResourceTracker的初始化函数:

class ResourceTracker(object):
"""Compute helper class for keeping track of resource usage as instances
are built and destroyed.
"""


def __init__(self, host, driver, nodename):
self.host = host
self.driver = driver
self.pci_tracker = None
self.nodename = nodename
self.compute_node = None
self.stats = importutils.import_object(CONF.compute_stats_class)
self.tracked_instances = {}
self.tracked_migrations = {}
self.conductor_api = conductor.API()
monitor_handler = monitors.ResourceMonitorHandler()
self.monitors = monitor_handler.choose_monitors(self)
self.ext_resources_handler = \
ext_resources.ResourceHandler(CONF.compute_resources)
self.notifier = rpc.get_notifier()
self.old_resources = {}
self.scheduler_client = scheduler_client.SchedulerClient()

ResourceTracker的self.host和self.nodename被传入的参数初始化.现在我们回来看ResourceTracker的update_available_resource函数:

    def update_available_resource(self, context):
"""Override in-memory calculations of compute node resource usage based
on data audited from the hypervisor layer.

Add in resource claims in progress to account for operations that have
declared a need for resources, but not necessarily retrieved them from
the hypervisor layer yet.
"""

LOG.audit(_("Auditing locally available compute resources"))
#从驱动中获取节点资源
resources = self.driver.get_available_resource(self.nodename)

if not resources:
# The virt driver does not support this function
LOG.audit(_("Virt driver does not support "
"'get_available_resource' Compute tracking is disabled."))
self.compute_node = None
return
resources['host_ip'] = CONF.my_ip

# TODO(berrange): remove this once all virt drivers are updated
# to report topology
if "numa_topology" not in resources:
resources["numa_topology"] = None

self._verify_resources(resources)
#上报虚拟机管理器资源信息
self._report_hypervisor_resource_view(resources)
#更新虚拟机管理器资源数据库记录
self._update_available_resource(context, resources)

接着调用self._update_available_resource(context, resources)

    def update_available_resource(self, context):
"""Override in-memory calculations of compute node resource usage based
on data audited from the hypervisor layer.

Add in resource claims in progress to account for operations that have
declared a need for resources, but not necessarily retrieved them from
the hypervisor layer yet.
"""

LOG.audit(_("Auditing locally available compute resources"))
#从驱动中获取节点资源
resources = self.driver.get_available_resource(self.nodename)

if not resources:
# The virt driver does not support this function
LOG.audit(_("Virt driver does not support "
"'get_available_resource' Compute tracking is disabled."))
self.compute_node = None
return
resources['host_ip'] = CONF.my_ip

# TODO(berrange): remove this once all virt drivers are updated
# to report topology
if "numa_topology" not in resources:
resources["numa_topology"] = None

self._verify_resources(resources)
self._report_hypervisor_resource_view(resources)
#更新虚拟机管理器资源数据库记录
self._update_available_resource(context, resources)

接着调用self._update_available_resource(context, resources)

    def _update_available_resource(self, context, resources):

# initialise the compute node object, creating it
# if it does not already exist.
#从数据库中获取节点资源信息
self._init_compute_node(context, resources)

# if we could not init the compute node the tracker will be
# disabled and we should quit now
if self.disabled:
return

if 'pci_passthrough_devices' in resources:
if not self.pci_tracker:
n_id = self.compute_node['id'] if self.compute_node else None
self.pci_tracker = pci_manager.PciDevTracker(context,
node_id=n_id)
self.pci_tracker.set_hvdevs(jsonutils.loads(resources.pop(
'pci_passthrough_devices')))

# Grab all instances assigned to this node:
instances = objects.InstanceList.get_by_host_and_node(
context, self.host, self.nodename,
expected_attrs=['system_metadata',
'numa_topology'])

# Now calculate usage based on instance utilization:
self._update_usage_from_instances(context, resources, instances)

# Grab all in-progress migrations:
capi = self.conductor_api
migrations = capi.migration_get_in_progress_by_host_and_node(context,
self.host, self.nodename)

self._update_usage_from_migrations(context, resources, migrations)

# Detect and account for orphaned instances that may exist on the
# hypervisor, but are not in the DB:
orphans = self._find_orphaned_instances()
self._update_usage_from_orphans(context, resources, orphans)

# NOTE(yjiang5): Because pci device tracker status is not cleared in
# this periodic task, and also because the resource tracker is not
# notified when instances are deleted, we need remove all usages
# from deleted instances.
if self.pci_tracker:
self.pci_tracker.clean_usage(instances, migrations, orphans)
resources['pci_stats'] = jsonutils.dumps(self.pci_tracker.stats)
else:
resources['pci_stats'] = jsonutils.dumps([])

self._report_final_resource_view(resources)

metrics = self._get_host_metrics(context, self.nodename)
resources['metrics'] = jsonutils.dumps(metrics)
##上报虚拟机管理器信息
self._update(context, resources)
LOG.info(_LI('Compute_service record updated for %(host)s:%(node)s'),
{'host': self.host, 'node': self.nodename})

接着是self._update(context, resources)

    def _update(self, context, values):
"""Update partial stats locally and populate them to Scheduler."""
self._write_ext_resources(values)
# NOTE(pmurray): the stats field is stored as a json string. The
# json conversion will be done automatically by the ComputeNode object
# so this can be removed when using ComputeNode.
values['stats'] = jsonutils.dumps(values['stats'])

if not self._resource_change(values):
return
if "service" in self.compute_node:
del self.compute_node['service']
# NOTE(sbauza): Now the DB update is asynchronous, we need to locally
# update the values
self.compute_node.update(values)
# Persist the stats to the Scheduler
#持久化
self._update_resource_stats(context, values)
if self.pci_tracker:
self.pci_tracker.save(context)

接着是self._update_resource_stats(context, values)

    def _update_resource_stats(self, context, values):
stats = values.copy()
stats['id'] = self.compute_node['id']
self.scheduler_client.update_resource_stats(
context, (self.host, self.nodename), stats)

在_update_resource_stats函数中,通过RPC调用将主机信息和虚拟机管理器信息上报给nova-scheduler,这样openstack就识别了结算节点.