DVR在Neutron中的代码实现

DVR的代码可以按照下面的次序来阅读:
1.DVR的q-svc处的建立的代码
2.q-l3获取router信息建立DVR的代码
3.q-agt设置flow规则的代码

下面是主要的代码:

1.DVR的q-svc处的建立的代码:
首先来看DVR的q-svc处的建立的代码。L3 plugin的构造方法中可以看到:

    def __init__(self):
        self.setup_rpc()
        self.router_scheduler = importutils.import_object(
            cfg.CONF.router_scheduler_driver)
        self.start_periodic_l3_agent_status_check()
        super(L3RouterPlugin, self).__init__()
        if 'dvr' in self.supported_extension_aliases:
            l3_dvrscheduler_db.subscribe()
        l3_db.subscribe()

这里来_dvrscheduler_db会的订阅几个event:

def subscribe():
    registry.subscribe(
        _notify_l3_agent_new_port, resources.PORT, events.AFTER_UPDATE)
    registry.subscribe(
        _notify_l3_agent_new_port, resources.PORT, events.AFTER_CREATE)
    registry.subscribe(
        _notify_port_delete, resources.PORT, events.AFTER_DELETE)

_notify_l3_agent_new_port的逻辑为:如果一个port新增了或修改了,并且这个port是DVR的interface的话,发送add_arp_entry给l3 agent,其会修改对应的arp flow规则。同时也会调用routers_updated通知l3,告知其router有更新。
_notify_port_delete的逻辑为:调用del_arp_entry给l3 agent,其会修改对应的arp flow规则。如果port删除后router也可以删除了,那么会调用remove_router_from_l3_agent进行router的删除。

接着看下q-svc的create_router代码:

    def create_router(self, context, router):
        is_ha = self._is_ha(router['router'])

        if is_ha and l3_dvr_db.is_distributed_router(router['router']):
            raise l3_ha.DistributedHARouterNotSupported()

        router['router']['ha'] = is_ha
        router_dict = super(L3_HA_NAT_db_mixin,
                            self).create_router(context, router)

        if is_ha:
            try:
                router_db = self._get_router(context, router_dict['id'])
                ha_network = self.get_ha_network(context,
                                                 router_db.tenant_id)
                if not ha_network:
                    ha_network = self._create_ha_network(context,
                                                         router_db.tenant_id)

                self._set_vr_id(context, router_db, ha_network)
                self._create_ha_interfaces(context, router_db, ha_network)
                self._notify_ha_interfaces_updated(context, router_db.id)
            except Exception:
                with excutils.save_and_reraise_exception():
                    self.delete_router(context, router_dict['id'])
            router_dict['ha_vr_id'] = router_db.extra_attributes.ha_vr_id
        return router_dict

if is_ha的代码在之前HaRouter中分析过了,从这边的代码可以看到没有和DVR相关的代码。所以目前DB中的Router信息和Legacy的Router在DB中的信息是一样的。

2.q-l3获取router信息建立DVR的代码:
由于q-l3是生产者消费者的模式,所以先来看下生产者中DVR相关的代码。sync_routers中和DVR相关的代码入口为:

        elif utils.is_extension_supported(
                self.l3plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
            if cfg.CONF.router_auto_schedule:
                self.l3plugin.auto_schedule_routers(context, host, router_ids)
            routers = (
                self.l3plugin.list_active_sync_routers_on_active_l3_agent(
                    context, host, router_ids))

这个和HaRouter是一样的。auto_schedule_routers会的将没有绑定的router绑定对应的l3 agent。但是,在其代码中可以看到:

    def _get_routers_to_schedule(self, context, plugin,
                                 router_ids=None, exclude_distributed=False):
        """Verify that the routers specified need to be scheduled.

        :param context: the context
        :param plugin: the core plugin
        :param router_ids: the list of routers to be checked for scheduling
        :param exclude_distributed: whether or not to consider dvr routers
        :returns: the list of routers to be scheduled
        """
        if router_ids is not None:
            routers = plugin.get_routers(context, filters={'id': router_ids})
            unscheduled_routers = self._filter_unscheduled_routers(
                context, plugin, routers)
        else:
            unscheduled_routers = self._get_unscheduled_routers(context,
                                                                plugin)

        if exclude_distributed:
            unscheduled_routers = [
                r for r in unscheduled_routers if not r.get('distributed')
            ]
        return unscheduled_routers

如果一个Router没有绑定,那么其会放到unscheduled_routers中。但是对于DVR,其又会从这个unscheduled_routers中剔除出去。所以此时对于DVR的路由,即便没有绑定l3 agent,其依然不会的auto schedule到某个l3 agent上去。那么一个DVR是在什么时候和一个l3 agent关联的呢?有下面两种情况:
1.某个运行l3 agent的host上存在一个VM,并且这个VM所在的subnet有一个DVR路由
2.当一个DVR有GW的时候,中心节点会绑定DVR
上面一开始的代码可以看到,在L3 plugin初始化的时候绑定了几个event,只要有port新增就会发送消息给l3 agent。_notify_l3_agent_new_port会的调用如下代码:

    def routers_updated(self, context, router_ids, operation=None, data=None,
                        shuffle_agents=False):
        if router_ids:
            self._notification(context, 'routers_updated', router_ids,
                               operation, shuffle_agents)

注意这边的self._notification,其除了notification外还做了些额外的不像是notification的事情(坑):

    def _notification(self, context, method, router_ids, operation,
                      shuffle_agents):
        """Notify all the agents that are hosting the routers."""
        plugin = manager.NeutronManager.get_service_plugins().get(
            service_constants.L3_ROUTER_NAT)
        if not plugin:
            LOG.error(_LE('No plugin for L3 routing registered. Cannot notify '
                          'agents with the message %s'), method)
            return
        if utils.is_extension_supported(
                plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
            adminContext = (context.is_admin and
                            context or context.elevated())
            plugin.schedule_routers(adminContext, router_ids)
            self._agent_notification(
                context, method, router_ids, operation, shuffle_agents)
        else:
            cctxt = self.client.prepare(fanout=True)
            cctxt.cast(context, method, routers=router_ids)

如果plugin支持constants.L3_AGENT_SCHEDULER_EXT_ALIAS,那么其会调用plugin.schedule_routers(adminContext, router_ids)。后者会执行router和l3 agent的绑定。感觉这个方法单独拿出来比较好,放在notification里不去跟踪代码还真的不好找到它。

来看下这个schedule_routers,默认我们的scheduler是ChanceScheduler,其会随机的选择一台主机分配给Router。分配代码如下:

    def _schedule_router(self, plugin, context, router_id,
                         candidates=None):
        sync_router = plugin.get_router(context, router_id)
        router_distributed = sync_router.get('distributed', False)
        if router_distributed:
            # For Distributed routers check for SNAT Binding before
            # calling the schedule_snat_router
            snat_bindings = plugin.get_snat_bindings(context, [router_id])
            router_gw_exists = sync_router.get('external_gateway_info', False)
            if not snat_bindings and router_gw_exists:
                # If GW exists for DVR routers and no SNAT binding
                # call the schedule_snat_router
                return plugin.schedule_snat_router(
                    context, router_id, sync_router)
            if not router_gw_exists and snat_bindings:
                # If DVR router and no Gateway but SNAT Binding exists then
                # call the unbind_snat_servicenode to unbind the snat service
                # from agent
                plugin.unbind_snat_servicenode(context, router_id)
                return
        candidates = candidates or self._get_candidates(
            plugin, context, sync_router)
        if not candidates:
            return
        if router_distributed:
            for chosen_agent in candidates:
                self.bind_router(context, router_id, chosen_agent)
        elif sync_router.get('ha', False):
            chosen_agents = self._bind_ha_router(plugin, context,
                                                 router_id, candidates)
            if not chosen_agents:
                return
            chosen_agent = chosen_agents[-1]
        else:
            chosen_agent = self._choose_router_agent(
                plugin, context, candidates)
            self.bind_router(context, router_id, chosen_agent)
        return chosen_agent

这里首先会检查这个DVR有没有l3 agent当做起SNAT的节点(agent_mode=dvr_snat),如果没有的话会执行plugin.schedule_snat_router进行绑定。绑定其实就是在DB中插入数据。否则,对于一般的节点(agent_mode=dvr),会查看主机上的subnet上是否有使用DVR的接口(没有接口的主机不分配Router,这样可以避免生成很多无效的Router。比如我一个环境有100台计算节点,但某个租户值申请了一台路由器和一台虚拟机,那么我是不需要在100台计算节点上都建立路由器的,只需要在虚机所在的计算节点和dvr_snat节点上建立路由器即可。最多可以省下99台路由器):

            elif is_router_distributed and agent_mode.startswith(
                constants.L3_AGENT_MODE_DVR) and (
                self.check_ports_exist_on_l3agent(
                    context, l3_agent, sync_router['id'])):
                candidates.append(l3_agent)

            ......

    def check_ports_exist_on_l3agent(self, context, l3_agent, router_id):
        """
        This function checks for existence of dvr serviceable
        ports on the host, running the input l3agent.
        """
        subnet_ids = self.get_subnet_ids_on_router(context, router_id)
        if not subnet_ids:
            return False

        core_plugin = manager.NeutronManager.get_plugin()
        # NOTE(swami):Before checking for existence of dvr
        # serviceable ports on the host managed by the l3
        # agent, let's verify if at least one subnet has
        # dhcp enabled. If so, then the host will have a
        # dvr serviceable port, which is in fact the DHCP
        # port.
        # This optimization is valid assuming that the L3
        # DVR_SNAT node will be the one hosting the DHCP
        # Agent.
        agent_conf = self.get_configuration_dict(l3_agent)
        agent_mode = agent_conf.get(constants.L3_AGENT_MODE,
                                    constants.L3_AGENT_MODE_LEGACY)

        for subnet_id in subnet_ids:
            subnet_dict = core_plugin.get_subnet(context, subnet_id)
            if (subnet_dict['enable_dhcp'] and (
                agent_mode == constants.L3_AGENT_MODE_DVR_SNAT)):
                return True

        filter = {'fixed_ips': {'subnet_id': subnet_ids}}
        ports = core_plugin.get_ports(context, filters=filter)
        for port in ports:
            if (n_utils.is_dvr_serviced(port['device_owner']) and
                l3_agent['host'] == port['binding:host_id']):
                    return True

        return False

        ......

    def is_dvr_serviced(device_owner):
        """Check if the port need to be serviced by DVR

        Helper function to check the device owners of the
        ports in the compute and service node to make sure
        if they are required for DVR or any service directly or
        indirectly associated with DVR.
        """
        dvr_serviced_device_owners = (q_const.DEVICE_OWNER_LOADBALANCER,
                                      q_const.DEVICE_OWNER_DHCP)
        return (device_owner.startswith('compute:') or
                device_owner in dvr_serviced_device_owners)

通过这种过滤方法,我们可以得到所有的candidate。对每台candidate会的执行self.bind_router(context, router_id, chosen_agent)方法进行l3 agetn和DVR的绑定。

现在可以回过去看q-l3的代码了,sync_routers会的通过如下代码返回Router信息:

    def get_ha_sync_data_for_host(self, context, host=None, router_ids=None,
                                  active=None):
        if n_utils.is_extension_supported(self,
                                          constants.L3_DISTRIBUTED_EXT_ALIAS):
            # DVR has to be handled differently
            agent = self._get_agent_by_type_and_host(context,
                                                     constants.AGENT_TYPE_L3,
                                                     host)
            sync_data = self._get_dvr_sync_data(context, host, agent,
                                                router_ids, active)
        else:
            sync_data = super(L3_HA_NAT_db_mixin, self).get_sync_data(context,
                                                            router_ids, active)
        return self._process_sync_ha_data(context, sync_data, host)

这里返回的信息和Legacy Router或HaRouter都差不多,只是会多一些特殊interface的信息。

当l3 agent得到了DVR信息后,会生成dvr_router.DvrRouter对象。其initialize方法和普通的Router一样,所以只是建立namespace。其process方法为:

    def process(self, agent):
        ex_gw_port = self.get_ex_gw_port()
        if ex_gw_port:
            self.fip_ns = agent.get_fip_ns(ex_gw_port['network_id'])
            self.fip_ns.scan_fip_ports(self)

        super(DvrRouter, self).process(agent)

如果router有GW,那么router对象会获取一个fpi_ns保存fip的namespace信息。DVR中的floating ip是走的本地,本地口出去流量的时候,是通过fip的namespace的。这里就是会获取这个namespace中的port信息:

    def scan_fip_ports(self, ri):
        # don't scan if not dvr or count is not None
        if ri.dist_fip_count is not None:
            return

        # scan system for any existing fip ports
        ri.dist_fip_count = 0
        rtr_2_fip_interface = self.get_rtr_ext_device_name(ri.router_id)
        if ip_lib.device_exists(rtr_2_fip_interface, namespace=ri.ns_name):
            device = ip_lib.IPDevice(rtr_2_fip_interface, namespace=ri.ns_name)
            existing_cidrs = [addr['cidr'] for addr in device.addr.list()]
            fip_cidrs = 1
            ri.dist_fip_count = len(fip_cidrs)

来看下process,DVR肯定重载了很多方法。第一个重载的方法是internal_network_added:

    def internal_network_added(self, port):
        super(DvrRouter, self).internal_network_added(port)

        # NOTE: The following function _set_subnet_arp_info
        # should be called to dynamically populate the arp
        # entries for the dvr services ports into the router
        # namespace. This does not have dependency on the
        # external_gateway port or the agent_mode.
        for subnet in port['subnets']:
            self._set_subnet_arp_info(subnet['id'])

        ex_gw_port = self.get_ex_gw_port()
        if not ex_gw_port:
            return

        snat_ports = self.get_snat_interfaces()
        sn_port = self._map_internal_interfaces(port, snat_ports)
        if not sn_port:
            return

        interface_name = self.get_internal_device_name(port['id'])
        self._snat_redirect_add(sn_port, port, interface_name)

        if not self._is_this_snat_host():
            return

        ns_name = dvr_snat_ns.SnatNamespace.get_snat_ns_name(self.router['id'])
        interface_name = self.get_snat_int_device_name(sn_port['id'])
        self._internal_network_added(
            ns_name,
            sn_port['network_id'],
            sn_port['id'],
            sn_port['fixed_ips'],
            sn_port['mac_address'],
            interface_name,
            dvr_snat_ns.SNAT_INT_DEV_PREFIX)

首先是调用普通的方法,将qr开头port plug到bridge中并设置好IP信息。这之后会设置到SNAT主机的route(这里用到了route table的功能,普通的ip route show看到的只是main这个table),也就是qr到sg的路由。最后设置sg口,方法就是普通的plug + init_l3。

目前路由层面内外和SNAT的就设置好了,看下走FIP的:

    def process_external(self, agent):
        ex_gw_port = self.get_ex_gw_port()
        if ex_gw_port:
            self.create_dvr_fip_interfaces(ex_gw_port)
        super(DvrRouter, self).process_external(agent)

重点是self.create_dvr_fip_interfaces(ex_gw_port)这个方法,其主要是设置FIP的namespace以及设置veth的链接:

    def create_dvr_fip_interfaces(self, ex_gw_port):
        floating_ips = self.get_floating_ips()
        fip_agent_port = self.get_floating_agent_gw_interface(
            ex_gw_port['network_id'])
        LOG.debug("FloatingIP agent gateway port received from the plugin: "
                  "%s", fip_agent_port)
        is_first = False
        if floating_ips:
            is_first = self.fip_ns.subscribe(self.router_id)
            if is_first and fip_agent_port:
                if 'subnets' not in fip_agent_port:
                    LOG.error(_LE('Missing subnet/agent_gateway_port'))
                else:
                    self.fip_ns.create_gateway_port(fip_agent_port)

        if self.fip_ns.agent_gateway_port and floating_ips:
            if self.dist_fip_count == 0 or is_first:
                self.fip_ns.create_rtr_2_fip_link(self)

                # kicks the FW Agent to add rules for the IR namespace if
                # configured
                self.agent.process_router_add(self)

对于第一个fip来说,create_gateway_port会的建立fip的namespace,并设置连接口以及路由、nat规则:

    def create_gateway_port(self, agent_gateway_port):
        """Create Floating IP gateway port.

           Request port creation from Plugin then creates
           Floating IP namespace and adds gateway port.
        """
        self.agent_gateway_port = agent_gateway_port

        # add fip-namespace and agent_gateway_port
        self.create()

        iface_name = self.get_ext_device_name(agent_gateway_port['id'])
        self._gateway_added(agent_gateway_port, iface_name)

对于基础的process_external方法,会设置FIP的DNAT规则。这里注意gw口的名字变了:

    def get_external_device_interface_name(self, ex_gw_port):
        fip_int = self.fip_ns.get_int_device_name(self.router_id)
        if ip_lib.device_exists(fip_int, namespace=self.fip_ns.get_name()):
            return self.fip_ns.get_rtr_ext_device_name(self.router_id)

这里的port是rfp开口的port。

最后,对于的process_floating_ip_nat_rules的实现则是和Legary Router的是一样的。

3.q-agt设置flow规则的代码
在OVSNeutronAgent的构造方法中可以看到如下DVR相关的代码:

        self.dvr_agent = ovs_dvr_neutron_agent.OVSDVRNeutronAgent(
            self.context,
            self.dvr_plugin_rpc,
            self.int_br,
            self.tun_br,
            self.bridge_mappings,
            self.phys_brs,
            self.int_ofports,
            self.phys_ofports,
            self.patch_int_ofport,
            self.patch_tun_ofport,
            cfg.CONF.host,
            self.enable_tunneling,
            self.enable_distributed_routing)

        ......

        self.dvr_agent.setup_dvr_flows()

self.dvr_agent.setup_dvr_flows()会设置默认的DVR规则:

    def setup_dvr_flows(self):
        self.setup_dvr_flows_on_integ_br()
        self.setup_dvr_flows_on_tun_br()
        self.setup_dvr_flows_on_phys_br()
        self.setup_dvr_mac_flows_on_all_brs()

DVR中,所有的egress的包的mac地址是一个host mac,具体就是在self.setup_dvr_mac_flows_on_all_brs()通过flow规则设置的:

    def setup_dvr_mac_flows_on_all_brs(self):
        if not self.in_distributed_mode():
            LOG.debug("Not in distributed mode, ignoring invocation "
                      "of get_dvr_mac_address_list() ")
            return
        dvr_macs = self.plugin_rpc.get_dvr_mac_address_list(self.context)
        LOG.debug("L2 Agent DVR: Received these MACs: %r", dvr_macs)
        for mac in dvr_macs:
            if mac['mac_address'] == self.dvr_mac_address:
                continue
            for physical_network in self.bridge_mappings:
                self.int_br.add_flow(table=constants.LOCAL_SWITCHING,
                    priority=4,
                    in_port=self.int_ofports[physical_network],
                    dl_src=mac['mac_address'],
                    actions="resubmit(,%s)" %
                    constants.DVR_TO_SRC_MAC_VLAN)
                self.phys_brs[physical_network].add_flow(
                    table=constants.DVR_NOT_LEARN_VLAN,
                    priority=2,
                    dl_src=mac['mac_address'],
                    actions="output:%s" %
                    self.phys_ofports[physical_network])

            if self.enable_tunneling:
                # Table 0 (default) will now sort DVR traffic from other
                # traffic depending on in_port
                self.int_br.add_flow(table=constants.LOCAL_SWITCHING,
                                     priority=2,
                                     in_port=self.patch_tun_ofport,
                                     dl_src=mac['mac_address'],
                                     actions="resubmit(,%s)" %
                                     constants.DVR_TO_SRC_MAC)
                # Table DVR_NOT_LEARN ensures unique dvr macs in the cloud
                # are not learnt, as they may
                # result in flow explosions
                self.tun_br.add_flow(table=constants.DVR_NOT_LEARN,
                                 priority=1,
                                 dl_src=mac['mac_address'],
                                 actions="output:%s" %
                                 self.patch_int_ofport)
            self.registered_dvr_macs.add(mac['mac_address'])

除了构造方法外,rpc_loop中并没有和DVR有关的代码。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*