Neutron分析(3)—— neutron-l3-agent

时间:2023-01-19 07:43:58

一.Layer-3 Networking Extension

neutron l3作为一种API扩展,向租户提供了路由和NAT功能。

l3扩展包含两种资源:

  • router:在不同内部子网中转发数据包;通过指定内部网关做NAT。每一个子网对应router上的一个端口,这个端口的ip就是子网的网关。
  • floating ip:代表一个外部网络的IP,映射到内部网络的端口上。当网络的router:external属性为True时,floating ip才能定义。

这两种资源都对应有不同的属性。支持CRUD操作。

二.代码分析

既然neutron中支持了l3扩展,那么怎样通过API来创建router或者floating ip,以提供路由以及NAT的功能的呢?

主要有以下几个步骤:
1.租户通过horizon,nova命令或者自定义的脚本,发送与router或floating ip相关的操作。
2.这些API请求发送到neutron server,通过neutron提供的API extension相对应。
3.实现这些API extension的操作,比如说create_router,则由具体的plugin和database来共同完成。
4.plugin会通过rpc机制与计算网络节点上运行的l3 agent来执行l3 转发和NAT的功能。

l3.py

源代码目录:neutron/extensions/l3.py

class RouterPluginBase(object):

    @abc.abstractmethod
def create_router(self, context, router):
pass @abc.abstractmethod
def update_router(self, context, id, router):
pass @abc.abstractmethod
def get_router(self, context, id, fields=None):
pass @abc.abstractmethod
def delete_router(self, context, id):
pass @abc.abstractmethod
def get_routers(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
pass @abc.abstractmethod
def add_router_interface(self, context, router_id, interface_info):
pass @abc.abstractmethod
def remove_router_interface(self, context, router_id, interface_info):
pass @abc.abstractmethod
def create_floatingip(self, context, floatingip):
pass @abc.abstractmethod
def update_floatingip(self, context, id, floatingip):
pass @abc.abstractmethod
def get_floatingip(self, context, id, fields=None):
pass @abc.abstractmethod
def delete_floatingip(self, context, id):
pass @abc.abstractmethod
def get_floatingips(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
pass def get_routers_count(self, context, filters=None):
raise NotImplementedError() def get_floatingips_count(self, context, filters=None):
raise NotImplementedError()
这个模块中,class RouterPluginBase定义了plugin中需要实现的方法。

l3_db.py

源码目录:/neutron/db/l3_db.py
这个模块中,class L3_NAT_db_mixin继承了上面l3模块的class RouterPluginBase,因此在RouterPluginBase中定义的抽象方法就要在这里实现了。
类注释中写道,Mixin class to add L3/NAT router methods to db_plugin_base_v2。

在类的开始,有这样一段代码:

@property
def l3_rpc_notifier(self):
if not hasattr(self, '_l3_rpc_notifier'):
self._l3_rpc_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
return self._l3_rpc_notifier
说明l3_rpc_notifier,是模块l3_rpc_agent_api中类L3AgentNotifyAPI的一个实例。

l3_rpc_agent_api模块源码在/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py。

class L3AgentNotifyAPI(n_rpc.RpcProxy):
"""API for plugin to notify L3 agent."""
BASE_RPC_API_VERSION = '1.0' def __init__(self, topic=topics.L3_AGENT):
super(L3AgentNotifyAPI, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION) def _notification_host(self, context, method, payload, host):
"""Notify the agent that is hosting the router.""" ... def _agent_notification(self, context, method, router_ids,
operation, data):
"""Notify changed routers to hosting l3 agents."""
...
def _notification(self, context, method, router_ids, operation, data):
"""Notify all the agents that are hosting the routers."""
...def _notification_fanout(self, context, method, router_id):
"""Fanout the deleted router to all L3 agents."""
...def agent_updated(self, context, admin_state_up, host):
self._notification_host(context, 'agent_updated',
{'admin_state_up': admin_state_up},
host) def router_deleted(self, context, router_id):
self._notification_fanout(context, 'router_deleted', router_id) def routers_updated(self, context, router_ids, operation=None, data=None):
if router_ids:
self._notification(context, 'routers_updated', router_ids,
operation, data) def router_removed_from_agent(self, context, router_id, host):
self._notification_host(context, 'router_removed_from_agent',
{'router_id': router_id}, host) def router_added_to_agent(self, context, router_ids, host):
self._notification_host(context, 'router_added_to_agent',
router_ids, host)

这个类主要用于plugin发送rpc通知给l3 agent。

rpc处理

在上面的l3_db.py中,会将涉及router和floating ip的处理读取或者写入到数据中。但是还有一些操作不仅如此,还需要通过rpc(通过调用l3_rpc_agent_api中的函数,这些操作大部分会去 调用routers_updated),通知l3 agent进行处理。

这些需要处理的地方包括:update_router,delete_router,add_router_interface,remove_router_interface,create_floatingip,update_floatingip,delete_floatingip,disassociate_floatingips

等操作。

l3_agent.py

源码目录:neutron/agent/l3_agent.py

l3 agent使用Linux ip协议栈和iptables来实现router和NAT的功能。

这时候,如果在horizon的界面创建一个路由,不进行任何操作的话,plugin只会操作数据库,l3 agent不会作处理。而当update router,如设置外部网关时,l3才会去处理请求。

l3 agent使用service框架启动服务,其manager类为neutron.agent.l3_agent.L3NATAgentWithStateReport,该类继承自L3NATAgent,主要实现了基于rpc的_report_state向PluginReportStateAPI(topic为q-plugin)汇报状态信息,这些信息由各个plugin来处理(比如ml2中通过start_rpc_listeners来注册该topic的消费者)。

L3NATAgent类是最主要的L3 Manager类,该类继承关系为 class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager) ;FWaaSL3AgentRpcCallback主要是加载防火墙驱动,并创建RPC与Plugin通信。

再来看L3NATAgent的创建过程:

   def __init__(self, host, conf=None):
if conf:
self.conf = conf
else:
self.conf = cfg.CONF
self.root_helper = config.get_root_helper(self.conf)
self.router_info = {} self._check_config_params() try:
       # import driver from l3_agent.init
# Example: interface_driver = neutron.agent.linux.interface.OVSInterfaceDriver
self.driver = importutils.import_object(
self.conf.interface_driver,
self.conf
)
except Exception:
msg = _("Error importing interface driver "
"'%s'") % self.conf.interface_driver
LOG.error(msg)
raise SystemExit(1) self.context = context.get_admin_context_without_session()
# Agent side of the l3 agent RPC API, topic is 'q-l3-plugin'
self.plugin_rpc
= L3PluginApi(topics.L3PLUGIN, host)
self.fullsync = True
self.updated_routers = set()
self.removed_routers = set()
self.sync_progress = False self._clean_stale_namespaces = self.conf.use_namespaces
# Start RPC Loop
self.rpc_loop
= loopingcall.FixedIntervalLoopingCall(
self._rpc_loop)
self.rpc_loop.start(interval=
RPC_LOOP_INTERVAL)
super(L3NATAgent, self).__init__(conf=self.conf) self.target_ex_net_id = None

上面的self.plugin_rpc会处理neutron-server转发过来的请求,这个请求是通过service_plugins的方式处理的:

neutron.service_plugins =
dummy = neutron.tests.unit.dummy_plugin:DummyServicePlugin
router = neutron.services.l3_router.l3_router_plugin:L3RouterPlugin
firewall = neutron.services.firewall.fwaas_plugin:FirewallPlugin
lbaas = neutron.services.loadbalancer.plugin:LoadBalancerPlugin
vpnaas = neutron.services.vpn.plugin:VPNDriverPlugin
metering = neutron.services.metering.metering_plugin:MeteringPlugin

self.rpc_loop会循环检测从plugin发送过来的rpc请求:

  @lockutils.synchronized('l3-agent', 'neutron-')
def _rpc_loop(self):
# _rpc_loop and _sync_routers_task will not be
# executed in the same time because of lock.
# so we can clear the value of updated_routers
# and removed_routers, but they can be updated by
# updated_routers and removed_routers rpc call
try:
LOG.debug(_("Starting RPC loop for %d updated routers"),
len(self.updated_routers))
if self.updated_routers: # 保存了需要本次处理的router信息
# We're capturing and clearing the list, and will
# process the "captured" updates in this loop,
# and any updates that happen due to a context switch
# will be picked up on the next pass.
updated_routers = set(self.updated_routers)
self.updated_routers.clear()
router_ids = list(updated_routers)
routers = self.plugin_rpc.get_routers(
self.context, router_ids)
# routers with admin_state_up=false will not be in the fetched
fetched = set([r['id'] for r in routers])
#不在fetched中而在updated_routers中,说明需删除
self.removed_routers.update(updated_routers - fetched) self._process_routers(routers)
self._process_router_delete()

LOG.debug(_("RPC loop successfully completed"))
except Exception:
LOG.exception(_("Failed synchronizing routers"))
self.fullsync = True

_process_routers

如果有rpc请求过来,即需要更新路由信息,或者添加路由子接口,创建floating ip等操作,都会在这里执行。这个函数里会去调用_process_routers函数,在_process_routers函数中会去创建绿色线程,执行process_router函数。可以说,l3 agent调用网络设备的工作都会在process_router中进行。

    def process_router(self, ri):
ri.iptables_manager.defer_apply_on()
ex_gw_port = self._get_ex_gw_port(ri)
internal_ports = ri.router.get(l3_constants.INTERFACE_KEY, [])
existing_port_ids = set([p['id'] for p in ri.internal_ports])
current_port_ids = set([p['id'] for p in internal_ports
if p['admin_state_up']])
new_ports = [p for p in internal_ports if
p['id'] in current_port_ids and
p['id'] not in existing_port_ids]
old_ports = [p for p in ri.internal_ports if
p['id'] not in current_port_ids]
for p in new_ports:
self._set_subnet_info(p)
self.internal_network_added(ri, p['network_id'], p['id'],
p['ip_cidr'], p['mac_address'])
ri.internal_ports.append(p) for p in old_ports:
self.internal_network_removed(ri, p['id'], p['ip_cidr'])
ri.internal_ports.remove(p) existing_devices = self._get_existing_devices(ri)
current_internal_devs = set([n for n in existing_devices
if n.startswith(INTERNAL_DEV_PREFIX)])
current_port_devs = set([self.get_internal_device_name(id) for
id in current_port_ids])
stale_devs = current_internal_devs - current_port_devs
for stale_dev in stale_devs:
LOG.debug(_('Deleting stale internal router device: %s'),
stale_dev)
self.driver.unplug(stale_dev,
namespace=ri.ns_name,
prefix=INTERNAL_DEV_PREFIX) # Get IPv4 only internal CIDRs
internal_cidrs = [p['ip_cidr'] for p in ri.internal_ports
if netaddr.IPNetwork(p['ip_cidr']).version == 4]
# TODO(salv-orlando): RouterInfo would be a better place for
# this logic too
ex_gw_port_id = (ex_gw_port and ex_gw_port['id'] or
ri.ex_gw_port and ri.ex_gw_port['id']) interface_name = None
if ex_gw_port_id:
interface_name = self.get_external_device_name(ex_gw_port_id)
if ex_gw_port and ex_gw_port != ri.ex_gw_port:
self._set_subnet_info(ex_gw_port)
self.external_gateway_added(ri, ex_gw_port,
interface_name, internal_cidrs)
elif not ex_gw_port and ri.ex_gw_port:
self.external_gateway_removed(ri, ri.ex_gw_port,
interface_name, internal_cidrs) stale_devs = [dev for dev in existing_devices
if dev.startswith(EXTERNAL_DEV_PREFIX)
and dev != interface_name]
for stale_dev in stale_devs:
LOG.debug(_('Deleting stale external router device: %s'),
stale_dev)
self.driver.unplug(stale_dev,
bridge=self.conf.external_network_bridge,
namespace=ri.ns_name,
prefix=EXTERNAL_DEV_PREFIX) # Process static routes for router
self.routes_updated(ri)
# Process SNAT rules for external gateway
ri.perform_snat_action(self._handle_router_snat_rules,
internal_cidrs, interface_name) # Process SNAT/DNAT rules for floating IPs
fip_statuses = {}
try:
if ex_gw_port:
existing_floating_ips = ri.floating_ips
self.process_router_floating_ip_nat_rules(ri)
ri.iptables_manager.defer_apply_off()
# Once NAT rules for floating IPs are safely in place
# configure their addresses on the external gateway port
fip_statuses = self.process_router_floating_ip_addresses(
ri, ex_gw_port)
except Exception:
# TODO(salv-orlando): Less broad catching
# All floating IPs must be put in error state
for fip in ri.router.get(l3_constants.FLOATINGIP_KEY, []):
fip_statuses[fip['id']] = l3_constants.FLOATINGIP_STATUS_ERROR if ex_gw_port:
# Identify floating IPs which were disabled
ri.floating_ips = set(fip_statuses.keys())
for fip_id in existing_floating_ips - ri.floating_ips:
fip_statuses[fip_id] = l3_constants.FLOATINGIP_STATUS_DOWN
# Update floating IP status on the neutron server
self.plugin_rpc.update_floatingip_statuses(
self.context, ri.router_id, fip_statuses) # Update ex_gw_port and enable_snat on the router info cache
ri.ex_gw_port = ex_gw_port
ri.enable_snat = ri.router.get('enable_snat')
process_router函数所做的工作有:

1.处理内部接口

这个是在router添加和删除子接口时工作。它会调用internal_network_added和internal_network_removed这个两个函数。

在internal_network_added和internal_network_removed这个两个函数会去调用OVSInterfaceDriver的plug和unplug
函数,这两个函数最终会用ip linkip addr的命令去处理接口和ip地址。

2.处理外部网关
router添加和删除外部网关。调用external_gateway_added和external_gateway_removed函数,同样也会调用plug和unplug函数,用ip linkip addr的命令进行最终处理

3.为外部网关做SNAT

调用_handle_router_snat_rules函数,使用iptables来加链和删除链。

在我的测试网络中,router上有3个接口,外部网关地址为192.168.39.2,内部两个子网的网关为10.1.0.1,10.2.0.1。iptables规则如下:

1
2
3
iptables -t nat -A POSTROUTING ! -i qg-fcb1a762-1f ! -o qg-fcb1a762-1f -m conntrack ! --ctstate DNAT -j ACCEPT
iptables -t nat -A snat -s 10.2.0.1/24 -j SNAT --to-source 192.168.39.2
iptables -t nat -A snat -s 10.1.0.1/24 -j SNAT --to-source 192.168.39.2

qg-fcb1a762-1f为外部网关接口的索引,使用ip netns exec $namespace ip link list可查看。

4.为floating ip做SNAT/DNAT

和浮动IP相关,如创建,更新,删除,绑定到一个云主机的接口,解绑定等。

不同neutron版本这部分的处理不同,这里是基于Icehouse rc1版本的,在havava stable版本,只有一个函数来处理iptables规则和floating ip。

process_router_floating_ip_nat_rules :当floating ip与云主机绑定时,会先清除已有的floating_ip规则,再加上要添加的iptables规则,同时重新加载清除的iptables规则。

比如,一个云主机10.1.0.2上绑定了一个floating ip(192.168.39.5)。那么最终会在iptable不同的链中添加iptables规则,float-snat为neutron自定义链。

1
2
3
iptables -t nat -A PREROUTING -d 192.168.39.5 -j DNAT --to 10.1.0.2
iptables -t nat -A OUTPUT -d 192.168.39.5 -j DNAT --to 10.1.0.2
iptables -t nat -A float-snat -s 10.1.0.2 -j SNAT --to 192.168.39.5

process_router_floating_ip_addresses:

将floating ip和云主机绑定时,使用ip addr add命令添加ip地址。
解除floating ip和云主机绑定时,使用ip addr del命令将floating ip删除。

类图

Neutron分析(3)—— neutron-l3-agent

本文转自http://squarey.me/cloud-virtualization/neutron-l3-analyse.html,有部分删改。

Neutron分析(3)—— neutron-l3-agent的更多相关文章

  1. Neutron路由篇:L3 agent+Namespace

    Neutron 的路由服务是由 l3 agent 提供的. 除此之外,l3 agent 通过 iptables 提供 firewall 和 floating ip 服务.     l3 agent 需 ...

  2. 理解 OpenStack 高可用(HA)(2):Neutron L3 Agent HA 之 虚拟路由冗余协议(VRRP)

    本系列会分析OpenStack 的高可用性(HA)概念和解决方案: (1)OpenStack 高可用方案概述 (2)Neutron L3 Agent HA - VRRP (虚拟路由冗余协议) (3)N ...

  3. Neutron分析(5)—— neutron-l3-agent中的iptables

    一.iptables简介 1.iptables数据包处理流程 以本机为目的的包,由上至下,走左边的路 本机产生的包,从local process开始走左边的路 本机转发的包,由上至下走右边的路 简化流 ...

  4. Neutron 理解 (6): Neutron 是怎么实现虚拟三层网络的 [How Neutron implements virtual L3 network]

    学习 Neutron 系列文章: (1)Neutron 所实现的虚拟化网络 (2)Neutron OpenvSwitch + VLAN 虚拟网络 (3)Neutron OpenvSwitch + GR ...

  5. Neutron分析(7)—— neutron-l3-agent HA solutions

    1. keepalived vrrp/conntrackd High availability features will be implemented as extensions or driver ...

  6. openstack Neutron分析(3)—— neutron-dhcp-agent源码分析

    1.neutron dhcp3个主要部件分别为什么?2.dhcp模块包含哪些内容?3.Dnsmasq配置文件是如何创建和更新的?4.DHCP agent的信息存放在neutron数据库的哪个表中? 扩 ...

  7. Neutron分析(2)——neutron-server启动过程分析

    neutron-server启动过程分析 1. /etc/init.d/neutron-server DAEMON=/usr/bin/neutron-server DAEMON_ARGS=" ...

  8. Neutron 理解 (1): Neutron 所实现的虚拟化网络 [How Netruon Virtualizes Network]

    学习 Neutron 系列文章: (1)Neutron 所实现的虚拟化网络 (2)Neutron OpenvSwitch + VLAN 虚拟网络 (3)Neutron OpenvSwitch + GR ...

  9. Neutron 理解 (4): Neutron OVS OpenFlow 流表 和 L2 Population [Netruon OVS OpenFlow tables + L2 Population]

    学习 Neutron 系列文章: (1)Neutron 所实现的虚拟化网络 (2)Neutron OpenvSwitch + VLAN 虚拟网络 (3)Neutron OpenvSwitch + GR ...

随机推荐

  1. union和union all 合并查询

    union联合查询 SELECT TOP ID,oTitle Title,oInfo Description,Pic Images AND UpTime > dateadd(day,-,UpTi ...

  2. ZeroMQ接口函数之 :zmq_msg_copy - 把一个消息的内容复制到另一个消息中

    ZeroMQ 官方地址 :http://api.zeromq.org/4-1:zmq_msg_copy zmq_msg_copy(3)   ØMQ Manual - ØMQ/3.2.5 Name zm ...

  3. AngularJs angular.equals

    angular.equals 对比两个对象/值是否相等.支持值类型.正则表达式.数组和对象. 如果下列至少有一个是正确的,则将两个对象/值视为相等. 两个对象/值能通过===比较. 两个对象/值是同一 ...

  4. jsp_属性范围_page

    page属性范围(使用pageContext表示,但是一般习惯于将这种范围称为page范围)表示将一个属性设置在本页上,页面跳转之后无法取得. 下面我们来写两个小例子测试一下: 1.在同一个jsp页面 ...

  5. SequoiaDB 系列源码分析调整

    犹豫我经验尚不够丰富,有大牛跟我说,以我这样定下的结构来分析源码,学习效果不太好. 应该先从程序的进程入口函数开始,慢慢的跟流程来分析.先通过系统的启动.退出来分析所用到的技术,像进程模型,线程模型等 ...

  6. mysql新建用户本地无法登录

    mysql新建用户本地无法登录 MySQLDebianGoogleAccess  出此是用mysql,因为root权限过高,所以新建一用户appadmin,权限仅为要用到的数据库.创建语句如下:gra ...

  7. javaee学习-JSP指令简介

    JSP指令(directive)是为JSP引擎而设计的,它们并不直接产生任何可见输出,而只是告诉引擎如何处理JSP页面中的其余部分. 在JSP 2.0规范*定义了三个指令: page指令 Inclu ...

  8. builds error

    使用cocoapods 出现Undefined symbols for architecture i386: _OBJC_CLASS_$_XXXXXXX", referenced from: ...

  9. APP IM 之 XMPP和Jabber及选择方案

    1. 概述 IM ,InstantMessaging,即时通信. 现在,市面上有一批提供即时通信功能的公司.如:全时.云之讯(IM无语音和视频).容联云通讯(支持点对点音视频,按照消息的存储空间收费) ...

  10. RMI方式Ehcache集群的源码分析

    Ehcache不仅支持基本的内存缓存,还支持多种方式将本地内存中的缓存同步到其他使用Ehcache的服务器中,形成集群.如下图所示: Ehcache支持多种集群方式,下面以RMI通信方式为例,来具体分 ...