Openstack liberty 云主机迁移源码分析之在线迁移3

时间:2023-02-02 02:14:24

这是在线迁移 源码分析的第三篇,Openstack liberty 云主机迁移源码分析之在线迁移2中分析了prepare准备阶段nova-compute的处理过程,本文将会分析execute执行阶段的处理过程,下面一起来看具体内容:

execute执行阶段

#`nova/compute/manager.py/ComputeManager._do_live_migration`
def _do_live_migration(self, context, dest, instance,
block_migration,
migration, migrate_data)
:


#省略prepare准备阶段的代码部分,具体分析请查阅前一篇博文

"""`prepare`准备阶段完成后,返回如下的字典pre_migration_data :
{'graphics_listen_addrs': {}, 'volume': {},
'serial_listen_addr': {}}
"""

migrate_data['pre_live_migration_result'] =
pre_migration_data

#更新`nova.instance_migrations`数据库,状态改为:running
if migration:
migration.status = 'running'
migration.save()

migrate_data['migration'] = migration
try:
"""调用虚拟化驱动(LibvirtDriver)执行迁移

请看下文的具体分析
"""

self.driver.live_migration(context, instance, dest,
self._post_live_migration,
self._rollback_live_migration,
block_migration, migrate_data)
except Exception:
# Executing live migration
# live_migration might raises exceptions, but
# nothing must be recovered in this version.
LOG.exception(_LE('Live migration failed.'),
instance=instance)
#迁移失败,更新`nova.instance_migrations`,状态改为:
#failed,并上抛异常
with excutils.save_and_reraise_exception():
if migration:
migration.status = 'failed'
migration.save()

---------------------------------------------------------------
#接上文:`nova/virt/libvirt/driver.py/LibvirtDriver.live_migration`
def live_migration(self, context, instance, dest,
post_method, recover_method,
block_migration=False,
migrate_data=None)
:


"""Spawning live_migration operation for distributing
high-load.
"""


# 'dest' will be substituted into 'migration_uri' so ensure
# it does't contain any characters that could be used to
# exploit the URI accepted by libivrt
#校验目的主机名是否合法,只能:单词字符、_、-、.、:
if not libvirt_utils.is_valid_hostname(dest):
raise exception.InvalidHostname(hostname=dest)

#下文分析
self._live_migration(context, instance, dest,
post_method, recover_method,
block_migration,
migrate_data)

---------------------------------------------------------------
#接上文:
def _live_migration(self, context, instance, dest, post_method,
recover_method, block_migration,
migrate_data)
:

"""Do live migration.

This fires off a new thread to run the blocking migration
operation, and then this thread monitors the progress of
migration and controls its operation
"""


#通过libvirt获取实例的virDomain对象,然后返回对应的Guest对象
guest = self._host.get_guest(instance)

# TODO(sahid): We are converting all calls from a
# virDomain object to use nova.virt.libvirt.Guest.
# We should be able to remove dom at the end.
dom = guest._domain

#启动新线程执行块迁移,下文具体分析
opthread = utils.spawn(self._live_migration_operation,
context, instance, dest,
block_migration,
migrate_data, dom)

#创建事件并与块迁移线程关联,监视线程通过事件来了解迁移状态
finish_event = eventlet.event.Event()

def thread_finished(thread, event):
LOG.debug("Migration operation thread notification",
instance=instance)
event.send()
opthread.link(thread_finished, finish_event)

# Let eventlet schedule the new thread right away
time.sleep(0)

#省略异常处理:发生异常就上抛,见下文的具体分析
self._live_migration_monitor(context, instance, guest,
dest,
post_method,
recover_method,
block_migration,
migrate_data,
dom, finish_event)
#打印日志
LOG.debug("Live migration monitoring is all done",
instance=instance)

小结:上述过程很简单:更新迁移状态及校验目标主机名,之后创建线程执行块迁移并通过事件监控迁移状态

块迁移过程

由上文分析可知,块迁移线程函数为:_live_migration_operation,下面来看具体内容:

def _live_migration_operation(self, context, instance, dest,
block_migration,
migrate_data, dom)
:

"""Invoke the live migration operation

This method is intended to be run in a background thread
and will block that thread until the migration is finished
or failed.
"""


guest = libvirt_guest.Guest(dom)

#省略try{}except异常代码:发送异常打印日志并上抛异常

"""从配置中获取迁移标志,我的示例中block_migration=False
live_migration_flag="VIR_MIGRATE_UNDEFINE_SOURCE,VIR_MIGRAT
E_PEER2PEER,VIR_MIGRATE_LIVE,VIR_MIGRATE_PERSIST_DEST"
"""

if block_migration:
flaglist = CONF.libvirt.block_migration_flag.split(',')
else:
flaglist = CONF.libvirt.live_migration_flag.split(',')
#转换libvirt支持的标志并计算其或值
flagvals = [getattr(libvirt, x.strip()) for x in flaglist]
logical_sum = six.moves.reduce(lambda x, y: x | y,
flagvals)

#pre_live_migrate_data在`prepare`准备阶段中设置
pre_live_migrate_data = (migrate_data or {}).get(
'pre_live_migration_result', {})
#vnc监听地址
listen_addrs = \
pre_live_migrate_data.get('graphics_listen_addrs')
#串口信息
volume = pre_live_migrate_data.get('volume')
#串口监听地址
serial_listen_addr = pre_live_migrate_data.get(
'serial_listen_addr')

#检查是否支持VIR_DOMAIN_XML_MIGRATABLE属性,
migratable_flag = getattr(libvirt,
'VIR_DOMAIN_XML_MIGRATABLE', None)

#如果不支持VIR_DOMAIN_XML_MIGRATABLE属性或者vnc地址为空且没有串口
if (migratable_flag is None or
(listen_addrs is None and not volume)):
# TODO(alexs-h): These checks could be moved to the
# check_can_live_migrate_destination/source phase
"""如果配置的vnc或者spice监听地址不属于:
('0.0.0.0', '127.0.0.1', '::', '::1') 就抛异常
"""

self._check_graphics_addresses_can_live_migrate(
listen_addrs)
#确保CONF.serial_console.enabled=False
self._verify_serial_console_is_disabled()
#由libvirt完成迁移操作
dom.migrateToURI(
CONF.libvirt.live_migration_uri % dest,
logical_sum, None,
CONF.libvirt.live_migration_bandwidth)
else:
#先转储可迁移的xml配置,然后添加卷,vnc,serial信息组成新的可迁
#移配置
old_xml_str = guest.get_xml_desc(dump_migratable=True)
new_xml_str = self._update_xml(old_xml_str,
volume,
listen_addrs,
serial_listen_addr)
try:
#由libvirt完成迁移操作
dom.migrateToURI2(
CONF.libvirt.live_migration_uri % dest,
None,
new_xml_str,
logical_sum,
None,
CONF.libvirt.live_migration_bandwidth)
except libvirt.libvirtError as ex:
""" NOTE(mriedem): There is a bug in older versions
of libvirt where the VIR_DOMAIN_XML_MIGRATABLE flag
causes virDomainDefCheckABIStability to not compare
the source and target domain xml's correctly for
the CPU model.We try to handle that error here and
attempt the legacy migrateToURI path, which could
fail if the console addresses are not correct, but
in that case we have the
_check_graphics_addresses_can_live_migrate
check in place to catch it.

上面的意思是说:在老版本的libvirt中有个bug:
VIR_DOMAIN_XML_MIGRATABLE 标志导致
virDomainDefCheckABIStability 未能正确的比较源端和目的
端的CPU模式,这里再次尝试是用migrateToURI执行迁移
"""

# TODO(mriedem): Remove this workaround when
# Red Hat BZ #1141838 is closed.
#如果是VIR_ERR_CONFIG_UNSUPPORTED错误,就尝试再次迁移
#否则抛异常
error_code = ex.get_error_code()
if error_code ==libvirt.VIR_ERR_CONFIG_UNSUPPORTED:
LOG.warn(_LW('An error occurred trying to live'
'migrate. Falling back to legacy live'
'migrate flow. Error: %s'), ex,
instance=instance)

self.
_check_graphics_addresses_can_live_migrate(
listen_addrs)
self._verify_serial_console_is_disabled()
dom.migrateToURI(
CONF.libvirt.live_migration_uri % dest,
logical_sum,
None,
CONF.libvirt.live_migration_bandwidth)
else:
raise

#迁移结束,打印日志
LOG.debug("Migration operation thread has finished",
instance=instance)

小结:执行参数配置和条件检查,然后由libvirt完成迁移过程

状态监视

def _live_migration_monitor(self, context, instance, guest,
dest, post_method,
recover_method,
block_migration,
migrate_data, dom,
finish_event)
:

"""
从配置模板获得需要迁移的内存大小+从云主机获取需要迁移的磁盘大小
对于后端是共享存储(如:nfs,rbd)的cinder卷是不需要迁移的,只有本地
的lvm块设备或者raw/qcow2格式的本地文件才需要迁移
"""

data_gb = self._live_migration_data_gb(instance, guest,
block_migration)
#达到最大允许切换停机时间的步阶
downtime_steps =
list(self._migration_downtime_steps(data_gb))
#迁移允许执行的最长时间(之后会终止迁移)
completion_timeout = int(
CONF.libvirt.live_migration_completion_timeout * data_gb)
#更新迁移进度的最大等待时间
progress_timeout =
CONF.libvirt.live_migration_progress_timeout

"""下面是一长串的if else条件判断,根据迁移所处的状态执行不同的操作
"""

n = 0
start = time.time()
progress_time = start
progress_watermark = None
while True:
#获取实例的作业信息
info = host.DomainJobInfo.for_domain(dom)
if info.type == libvirt.VIR_DOMAIN_JOB_NONE:
"""这个type可以表示三种状态:
1. 迁移任务还没有开始,这可以通过判断迁移线程是否还在运
行来分辨
2.迁移由于失败/完成而结束了,这可以通过判断实例是否还在
当前主机运行来分辨
"""

#任务还没有开始
if not finish_event.ready():
LOG.debug("Operation thread is still"
" running",instance=instance)
else:
#如果获取实例状态出错,则抛异常
try:
#如果实例还在当前主机运行,说明迁移失败了
if guest.is_active():
LOG.debug("VM running on src,"
"migration failed",
instance=instance)
info.type =
libvirt.VIR_DOMAIN_JOB_FAILED
#否则就是迁移完成了
else:
LOG.debug("VM is shutoff,migration"
"finished",instance=instance)
info.type =
libvirt.VIR_DOMAIN_JOB_COMPLETED
except libvirt.libvirtError as ex:
LOG.debug("Error checking domain"
"status %(ex)s", ex,
instance=instance)
#如果错误码是实例不存在,说明迁移完成了
if ex.get_error_code() ==
libvirt.VIR_ERR_NO_DOMAIN:
LOG.debug("VM is missing,migration"
"finished", instance=instance)
info.type =
libvirt.VIR_DOMAIN_JOB_COMPLETED
#否则就是迁移失败了
else:
LOG.info(_LI("Error %(ex)s,"
"migration failed"),
instance=instance)
info.type =
libvirt.VIR_DOMAIN_JOB_FAILED
#迁移还没有开始
if info.type == libvirt.VIR_DOMAIN_JOB_NONE:
LOG.debug("Migration not running yet",
instance=instance)
#正在执行迁移
elif info.type == libvirt.VIR_DOMAIN_JOB_UNBOUNDED:
now = time.time()
elapsed = now - start
abort = False

#如果进度发生了变化,就更新
if ((progress_watermark is None) or
(progress_watermark > info.data_remaining)):
progress_watermark = info.data_remaining
progress_time = now
#如果进度更新间隔大于配置值,就终止迁移
if (progress_timeout != 0 and
(now - progress_time) > progress_timeout):
LOG.warn(_LW("Live migration stuck for %d"
" sec"),(now - progress_time),
instance=instance)
abort = True

#如果迁移时间超过了最大的允许迁移时间,就终止迁移
if (completion_timeout != 0 and
elapsed > completion_timeout):
LOG.warn(_LW("Live migration not completed"
"after %d sec"), completion_timeout,
instance=instance)
abort = True

#终止迁移任务
if abort:
try:
dom.abortJob()
except libvirt.libvirtError as e:
LOG.warn(_LW("Failed to abort migration"
"%s"),e, instance=instance)
raise

""" See if we need to increase the max downtime. We
ignore failures, since we'd rather continue trying
to migrate

增加在线迁移的最大切换时间
"""

if (len(downtime_steps) > 0 and
elapsed > downtime_steps[0][0]):
downtime = downtime_steps.pop(0)
LOG.info(_LI("Increasing downtime to %"
"(downtime)dms after %(waittime)d sec elapsed"
" time"), {"downtime": downtime[1],
"waittime": downtime[0]},
instance=instance)

try:
dom.migrateSetMaxDowntime(downtime[1])
except libvirt.libvirtError as e:
LOG.warn(_LW("Unable to increase max"
"downtime to %(time)d ms: %(e)s"),
{"time": downtime[1], "e": e},
instance=instance)
#每5s记录一次debug日志
if (n % 10) == 0:
#更新进度
remaining = 100
if info.memory_total != 0:
remaining = round(info.memory_remaining *
100 / info.memory_total)
instance.progress = 100 - remaining
instance.save()
#每30s记录一次info日志
lg = LOG.debug
if (n % 60) == 0:
lg = LOG.info

#这里省略日志语句

n = n+1
#迁移完成了
elif info.type == libvirt.VIR_DOMAIN_JOB_COMPLETED:
#调用ComputeManager._post_live_migration方法,执行扫尾
#工作,请看后面的具体分析
post_method(context, instance, dest,
block_migration,
migrate_data)
break
#迁移失败了
elif info.type == libvirt.VIR_DOMAIN_JOB_FAILED:
#调用ComputeManager._rollback_live_migration方法,执
#行回滚操作
recover_method(context, instance, dest,
block_migration,
migrate_data)
break
#迁移被取消了
elif info.type == libvirt.VIR_DOMAIN_JOB_CANCELLED:
#调用ComputeManager._rollback_live_migration方法,执
#行回滚操作
recover_method(context, instance, dest,
block_migration,
migrate_data)
break
else:
LOG.warn(_LW("Unexpected migration job type: %d"),
info.type, instance=instance)
#睡眠0.5s,再循环
time.sleep(0.5)

小结:一个大循环在不停的监视迁移状态,如果发生错误则退出;如果迁移完成就调用_post_live_migration 执行扫尾工作,如果迁移失败或者被取消就调用_rollback_live_migration执行回滚操作。

下一篇博文将分析complete完成阶段,敬请期待!!!