HaRouter在Neutron中的实现代码分析

这里分析下L3 HA的代码实现。

从create router开始看吧,server端入口为:

    def create_router(self, context, router):
        is_ha = self._is_ha(router['router'])

        if is_ha and l3_dvr_db.is_distributed_router(router['router']):
            raise l3_ha.DistributedHARouterNotSupported()

        router['router']['ha'] = is_ha
        router_dict = super(L3_HA_NAT_db_mixin,
                            self).create_router(context, router)

        if is_ha:
            try:
                router_db = self._get_router(context, router_dict['id'])
                ha_network = self.get_ha_network(context,
                                                 router_db.tenant_id)
                if not ha_network:
                    ha_network = self._create_ha_network(context,
                                                         router_db.tenant_id)

                self._set_vr_id(context, router_db, ha_network)
                self._create_ha_interfaces(context, router_db, ha_network)
                self._notify_ha_interfaces_updated(context, router_db.id)
            except Exception:
                with excutils.save_and_reraise_exception():
                    self.delete_router(context, router_dict['id'])
            router_dict['ha_vr_id'] = router_db.extra_attributes.ha_vr_id
        return router_dict

is_ha会判断这个router是不是要按照ha router来建立:

    @classmethod
    def _is_ha(cls, router):
        ha = router.get('ha')
        if not attributes.is_attr_set(ha):
            ha = cfg.CONF.l3_ha
        return ha

如果request中没有指定是否HA,那么根据配置文件的l3_ha参数来判断要不要建立ha的router。否则根据request中的HA参数判断是不是建立HA的router。

接着的代码是通用的建立router db的代码。比如在数据库中记录下router的信息。如果router在请求的时候包含了公网网关的信息则建立相关的gw_port,并记录相关信息。这里的代码是通用的,和L3 HA关系不大。

继续往下的代码就和HA有关了。在Neutorn当前的HaRouter实现中,HA的router会建立一个没有tenant id的ha network用于交互心跳信息。所以这里会建立这么一个ha network,从代码中可以看出一个tenant id只会有一个ha network(注意,这个ha network是没有tenant id的):

        args = {'network':
                {'name': constants.HA_NETWORK_NAME % tenant_id,
                 'tenant_id': '',
                 'shared': False,
                 'admin_state_up': True,
                 'status': constants.NET_STATUS_ACTIVE}}
        network = self._core_plugin.create_network(admin_ctx, args)

ha network存在后主要会做两件事情:第一是获取一个ha router的vrrp协议中的vrid。第二是建立用于在ha network中通信的port,配置文件中如果配置了一个router需要复制3份,那么就会建立3个port。代码如下:
从数据库中分配VRID:

    def _allocate_vr_id(self, context, network_id, router_id):
        for count in range(MAX_ALLOCATION_TRIES):
            try:
                with context.session.begin(subtransactions=True):
                    allocated_vr_ids = self._get_allocated_vr_id(context,
                                                                 network_id)
                    available_vr_ids = VR_ID_RANGE - allocated_vr_ids

                    if not available_vr_ids:
                        raise l3_ha.NoVRIDAvailable(router_id=router_id)

                    allocation = L3HARouterVRIdAllocation()
                    allocation.network_id = network_id
                    allocation.vr_id = available_vr_ids.pop()

                    context.session.add(allocation)

                    return allocation.vr_id

            except db_exc.DBDuplicateEntry:
                LOG.info(_LI("Attempt %(count)s to allocate a VRID in the "
                             "network %(network)s for the router %(router)s"),
                         {'count': count, 'network': network_id,
                          'router': router_id})

建立ha port:

    def _create_ha_interfaces(self, context, router, ha_network):
        admin_ctx = context.elevated()

        num_agents = self.get_number_of_agents_for_scheduling(context)

        port_ids = []
        try:
            for index in range(num_agents):
                binding = self.add_ha_port(admin_ctx, router.id,
                                           ha_network.network['id'],
                                           router.tenant_id)
                port_ids.append(binding.port_id)
        except Exception:
            with excutils.save_and_reraise_exception():
                for port_id in port_ids:
                    self._core_plugin.delete_port(admin_ctx, port_id,
                                                  l3_port_check=False)

上面的add_ha_port除了建立port外,还会的在ha_router_agent_port_bindings表中建立port和router的绑定关系。

这些做好之后其会发送一个给L3agent的fanout消息,告知其有接口变化了。所有的L3都会响应这个消息,进行一次自身的路由器同步。

现在来看下L3 agent会的做哪些操作。我们知道,L3是走的生产者消费者模式来处理路由信息的:

    def after_start(self):
        eventlet.spawn_n(self._process_routers_loop)
        LOG.info(_LI("L3 agent started"))
        # When L3 agent is ready, we immediately do a full sync
        self.periodic_sync_routers_task(self.context)

self.periodic_sync_routers_task会获取从server端获取router的信息,而self._process_routers_loop则会处理这些消息。所以首先来看self.periodic_sync_routers_task,其会的获取某一台L3 agent的主机上应该拥有的router信息(所以可以知道,如果这台主机被选为了L3 HA router的主机,那么其可以获取到这个router的信息,同时获取到一个ha port信息用于plug)。代码如下:

        try:
            if self.conf.use_namespaces:
                routers = self.plugin_rpc.get_routers(context)
            else:
                routers = self.plugin_rpc.get_routers(context,
                                                      [self.conf.router_id])

这里的rpc在server端的实现为:

    def sync_routers(self, context, **kwargs):
        """Sync routers according to filters to a specific agent.

        @param context: contain user information
        @param kwargs: host, router_ids
        @return: a list of routers
                 with their interfaces and floating_ips
        """
        router_ids = kwargs.get('router_ids')
        host = kwargs.get('host')
        context = neutron_context.get_admin_context()
        if not self.l3plugin:
            routers = {}
            LOG.error(_LE('No plugin for L3 routing registered! Will reply '
                          'to l3 agent with empty router dictionary.'))
        elif utils.is_extension_supported(
                self.l3plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
            if cfg.CONF.router_auto_schedule:
                self.l3plugin.auto_schedule_routers(context, host, router_ids)
            routers = (
                self.l3plugin.list_active_sync_routers_on_active_l3_agent(
                    context, host, router_ids))
        else:
            routers = self.l3plugin.get_sync_data(context, router_ids)
        if utils.is_extension_supported(
            self.plugin, constants.PORT_BINDING_EXT_ALIAS):
            self._ensure_host_set_on_ports(context, host, routers)
        LOG.debug("Routers returned to l3 agent:\n %s",
                  utils.DelayedStringRenderer(jsonutils.dumps,
                                              routers, indent=5))
        return routers

只看L3 HA相关的,代码为:

        elif utils.is_extension_supported(
                self.l3plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
            if cfg.CONF.router_auto_schedule:
                self.l3plugin.auto_schedule_routers(context, host, router_ids)
            routers = (
                self.l3plugin.list_active_sync_routers_on_active_l3_agent(
                    context, host, router_ids))

L3的调度器规则默认是这样的:如果一个router没有运行在某个L3 agent的主机上(也就是没有和某个agent绑定),那么其在被某个L3 agent的self.l3plugin.auto_schedule_routers调用的时候,就会的把自己绑定到这个L3 agent的主机上(数据库中插入绑定信息)。所以如果很多的Router都没有绑定,但是这个时候突然起来了一个L3 agent,那么所有的Router都会运行在这个L3 agent的主机上。

普通的规则就是上面说的这样,对于L3 HA的路由则有些不同,auto_schedule_routers中专门有如下代码:

            if utils.is_extension_supported(
                    plugin, constants.L3_HA_MODE_EXT_ALIAS):
                return self._schedule_ha_routers_to_additional_agent(
                    plugin, context, l3_agent)
    def _schedule_ha_routers_to_additional_agent(self, plugin, context, agent):
        """Bind already scheduled routers to the agent.

        Retrieve the number of agents per router and check if the router has
        to be scheduled on the given agent if max_l3_agents_per_router
        is not yet reached.
        """

        routers_agents = plugin.get_ha_routers_l3_agents_count(context)

        scheduled = False
        admin_ctx = context.elevated()
        for router_id, tenant_id, agents in routers_agents:
            max_agents_not_reached = (
                not self.max_ha_agents or agents < self.max_ha_agents)
            if max_agents_not_reached:
                if not self._router_has_binding(admin_ctx, router_id,
                                                agent.id):
                    self._create_ha_router_binding(plugin, admin_ctx,
                                                   router_id, tenant_id,
                                                   agent)
                    scheduled = True

        return scheduled

这里的代码逻辑是,获取router和agent的关系。如果一个router绑定的agent个数小于配置文件要求的,那么就判断当前的agent所在主机是否绑定了router,如果没有绑定则绑定。

之后的代码是获取router的详细信息,比如router上有几个port、floating ip有哪些之类的。HA router在这里的区别是要加上HA port 的信息:

        for binding in bindings:
            port_dict = self._core_plugin._make_port_dict(binding.port)

            router = routers_dict.get(binding.router_id)
            router[constants.HA_INTERFACE_KEY] = port_dict
            router[constants.HA_ROUTER_STATE_KEY] = binding.state

这里注意下HA router的ha intterface。上面的代码已经可以看到,如果一个router冗余为3,那么建立router的时候就会有3个HA interface。每个agent请求的时候请求到的应该只是其中的一个interface。代码如下:

        bindings = self.get_ha_router_port_bindings(context,
                                                    routers_dict.keys(),
                                                    host)
        for binding in bindings:
            port_dict = self._core_plugin._make_port_dict(binding.port)

            router = routers_dict.get(binding.router_id)
            router[constants.HA_INTERFACE_KEY] = port_dict
            router[constants.HA_ROUTER_STATE_KEY] = binding.state
    def get_ha_router_port_bindings(self, context, router_ids, host=None):
        if not router_ids:
            return []
        query = context.session.query(L3HARouterAgentPortBinding)

        if host:
            query = query.join(agents_db.Agent).filter(
                agents_db.Agent.host == host)

        query = query.filter(
            L3HARouterAgentPortBinding.router_id.in_(router_ids))

        return query.all()

现在L3 agent获取了router信息,接下来就是做处理了。对于新增的router,入口代码为:

    def _router_added(self, router_id, router):
        ri = self._create_router(router_id, router)
        registry.notify(resources.ROUTER, events.BEFORE_CREATE,
                        self, router=ri)

        self.router_info[router_id] = ri

        ri.initialize(self.process_monitor)

        # TODO(Carl) This is a hook in to fwaas.  It should be cleaned up.
        self.process_router_add(ri)

这里_create_router得到的是HaRouter:

    def _create_router(self, router_id, router):
        # TODO(Carl) We need to support a router that is both HA and DVR.  The
        # patch that enables it will replace these lines.  See bug #1365473.
        if router.get('distributed') and router.get('ha'):
            raise n_exc.DvrHaRouterNotSupported(router_id=router_id)

        args = []
        kwargs = {
            'router_id': router_id,
            'router': router,
            'use_ipv6': self.use_ipv6,
            'agent_conf': self.conf,
            'interface_driver': self.driver,
        }

        if router.get('distributed'):
            kwargs['agent'] = self
            kwargs['host'] = self.host
            return dvr_router.DvrRouter(*args, **kwargs)

        if router.get('ha'):
            kwargs['state_change_callback'] = self.enqueue_state_change
            return ha_router.HaRouter(*args, **kwargs)

        return legacy_router.LegacyRouter(*args, **kwargs)

HaRouter的构造方法可以看到keepalived的身影:

class HaRouter(router.RouterInfo):
    def __init__(self, state_change_callback, *args, **kwargs):
        super(HaRouter, self).__init__(*args, **kwargs)

        self.ha_port = None
        self.keepalived_manager = None
        self.state_change_callback = state_change_callback

来看下initialize方法:

    def initialize(self, process_monitor):
        super(HaRouter, self).initialize(process_monitor)
        ha_port = self.router.get(n_consts.HA_INTERFACE_KEY)
        if not ha_port:
            LOG.error(_LE('Unable to process HA router %s without HA port'),
                      self.router_id)
            return

        self.ha_port = ha_port
        self._init_keepalived_manager(process_monitor)
        self.ha_network_added()
        self.update_initial_state(self.state_change_callback)
        self.spawn_state_change_monitor(process_monitor)

可以看到,router会调用父类的initialize方法。之后会初始化keepalived,将ha port添加到br-int,更新初始状态以及启动状态监视器。来详细看下:

    def _init_keepalived_manager(self, process_monitor):
        self.keepalived_manager = keepalived.KeepalivedManager(
            self.router['id'],
            keepalived.KeepalivedConf(),
            process_monitor,
            conf_path=self.agent_conf.ha_confs_path,
            namespace=self.ns_name)

        config = self.keepalived_manager.config

        interface_name = self.get_ha_device_name()
        subnets = self.ha_port.get('subnets', [])
        ha_port_cidrs = [subnet['cidr'] for subnet in subnets]
        instance = keepalived.KeepalivedInstance(
            'BACKUP',
            interface_name,
            self.ha_vr_id,
            ha_port_cidrs,
            nopreempt=True,
            advert_int=self.agent_conf.ha_vrrp_advert_int,
            priority=self.ha_priority)
        instance.track_interfaces.append(interface_name)

        if self.agent_conf.ha_vrrp_auth_password:
            # TODO(safchain): use oslo.config types when it will be available
            # in order to check the validity of ha_vrrp_auth_type
            instance.set_authentication(self.agent_conf.ha_vrrp_auth_type,
                                        self.agent_conf.ha_vrrp_auth_password)

        config.add_instance(instance)

这里其实是在建立这个ha interface的keepalived.conf文件中的内容。可以想象过会就会根据这些信息生成keepalived.conf文件。

    def ha_network_added(self):
        interface_name = self.get_ha_device_name()

        self.driver.plug(self.ha_port['network_id'],
                         self.ha_port['id'],
                         interface_name,
                         self.ha_port['mac_address'],
                         namespace=self.ns_name,
                         prefix=HA_DEV_PREFIX)
        ip_cidrs = common_utils.fixed_ip_cidrs(self.ha_port['fixed_ips'])
        self.driver.init_l3(interface_name, ip_cidrs,
                            namespace=self.ns_name,
                            preserve_ips=[self._get_primary_vip()])

这里可以看到就是给ha interface加入到br-int上,并绑定ip设置路由。

    def update_initial_state(self, callback):
        ha_device = ip_lib.IPDevice(
            self.get_ha_device_name(),
            self.ns_name)
        addresses = ha_device.addr.list()
        cidrs = (address['cidr'] for address in addresses)
        ha_cidr = self._get_primary_vip()
        state = 'master' if ha_cidr in cidrs else 'backup'
        self.ha_state = state
        callback(self.router_id, state)

这里主要是判断当前的ha state,并调用回调函数。回调函数为:

        if router.get('ha'):
            kwargs['state_change_callback'] = self.enqueue_state_change
            return ha_router.HaRouter(*args, **kwargs)
    def enqueue_state_change(self, router_id, state):
        LOG.info(_LI('Router %(router_id)s transitioned to %(state)s'),
                 {'router_id': router_id,
                  'state': state})

        try:
            ri = self.router_info[router_id]
        except AttributeError:
            LOG.info(_LI('Router %s is not managed by this agent. It was '
                         'possibly deleted concurrently.'), router_id)
            return

        self._update_metadata_proxy(ri, router_id, state)
        self._update_radvd_daemon(ri, state)
        self.state_change_notifier.queue_event((router_id, state))

这里的回调函数会建立一个监听UnixSocket的greenlet不停的接受来自于ha的state变化消息,处理方法为:

    def notify_server(self, batched_events):
        translation_map = {'master': 'active',
                           'backup': 'standby',
                           'fault': 'standby'}
        translated_states = dict((router_id, translation_map[state]) for
                                 router_id, state in batched_events)
        LOG.debug('Updating server with HA routers states %s',
                  translated_states)
        self.plugin_rpc.update_ha_routers_states(
            self.context, translated_states)

这里的update_ha_routers_states会修改数据库中的binding状态:

    @classmethod
    def _set_router_states(cls, context, bindings, states):
        for binding in bindings:
            try:
                with context.session.begin(subtransactions=True):
                    binding.state = states[binding.router_id]
            except (orm.exc.StaleDataError, orm.exc.ObjectDeletedError):
                # Take concurrently deleted routers in to account
                pass

注意。这里会改变的只是数据库中的状态。

最后的spawn_state_change_monitor则是启动neutron-keepalived-state-change进程,其会调用ip命令监视router的namespace中的ip变化。之所以这样做是因为neutron无法知道当前哪个router是master,哪个是slave,所以通过监视以太口上的ip信息变化(变化来自于keepalived)来判断哪个是master哪个是backup。监视到ip的输出符合特定条件后,会发送消息给上面代码中启动的UnixSocket,UnixSocket会负责修改数据库中的代码。

接下来看下process:

    def process(self, agent):
        super(HaRouter, self).process(agent)

        if self.ha_port:
            self.enable_keepalived()

可以看到,HA的router在这里和普通的router是一样的,都会调用父类的process方法。父类的process方法会处理internal interface和external interface,设置route规则、SNAT啥的。也就是说,我们的每个HA router都会的拥有这些规则(比如,NAT规则都会拥有,但是route规则一开始是一样的,keepalived后面会改掉)。

不过process的一些涉及端口的方法HA router做了重载,比如:

    def internal_network_added(self, port):
        port_id = port['id']
        interface_name = self.get_internal_device_name(port_id)

        if not ip_lib.device_exists(interface_name, namespace=self.ns_name):
            self.driver.plug(port['network_id'],
                             port_id,
                             interface_name,
                             port['mac_address'],
                             namespace=self.ns_name,
                             prefix=router.INTERNAL_DEV_PREFIX)

        self._disable_ipv6_addressing_on_interface(interface_name)
        for ip_cidr in common_utils.fixed_ip_cidrs(port['fixed_ips']):
            self._add_vip(ip_cidr, interface_name)

这里会添加VIP(就是qr、qg的IP),同时真正的l3_init代码其实是由keepalived做。

这里HA router做的重要的一个事情是启动keepalived。代码如下:

    def spawn(self):
        config_path = self._output_config_file()

        def callback(pid_file):
            cmd = ['keepalived', '-P',
                   '-f', config_path,
                   '-p', pid_file,
                   '-r', '%s-vrrp' % pid_file]
            return cmd

        pm = self.get_process(callback=callback)
        pm.enable(reload_cfg=True)

        self.process_monitor.register(uuid=self.resource_id,
                                      service_name=KEEPALIVED_SERVICE_NAME,
                                      monitored_process=pm)

        LOG.debug('Keepalived spawned with config %s', config_path)

首先是生成keepalived的配置文件,然后就是启动进程了。配置文件的生成是根据之前instance的信息生成的,主要就是拼接字符串后写入文件:

    def build_config(self):
        config = ['vrrp_instance %s {' % self.name,
                  '    state %s' % self.state,
                  '    interface %s' % self.interface,
                  '    virtual_router_id %s' % self.vrouter_id,
                  '    priority %s' % self.priority]

        if self.nopreempt:
            config.append('    nopreempt')

        if self.advert_int:
            config.append('    advert_int %s' % self.advert_int)

        if self.authentication:
            auth_type, password = self.authentication
            authentication = ['    authentication {',
                              '        auth_type %s' % auth_type,
                              '        auth_pass %s' % password,
                              '    }']
            config.extend(authentication)

        if self.mcast_src_ip:
            config.append('    mcast_src_ip %s' % self.mcast_src_ip)

        if self.track_interfaces:
            config.extend(self._build_track_interface_config())

        config.extend(self._build_vips_config())

        if len(self.virtual_routes):
            config.extend(self.virtual_routes.build_config())

        config.append('}')

        return config

一个拼接出的文件例子如下:

[root@muti-devstack-02 6e7665ca-9b87-47a8-a570-b6bc999060fb]# cat keepalived.conf 
vrrp_instance VR_2 {
    state BACKUP
    interface ha-e4557083-e3
    virtual_router_id 2
    priority 50
    nopreempt
    advert_int 2
    track_interface {
        ha-e4557083-e3
    }
    virtual_ipaddress {
        169.254.0.2/24 dev ha-e4557083-e3
    }
    virtual_ipaddress_excluded {
        10.0.2.26/24 dev qg-73af0ee6-d7
        2001:db8::8/64 dev qg-73af0ee6-d7
        fe80::f816:3eff:fec8:af93/64 dev qg-73af0ee6-d7 scope link
    }
    virtual_routes {
        0.0.0.0/0 via 10.0.2.1 dev qg-73af0ee6-d7
        ::/0 via 2001:db8::2 dev qg-73af0ee6-d7
    }
}[root@muti-devstack-02 6e7665ca-9b87-47a8-a570-b6bc999060fb]# pwd
/opt/stack/data/neutron/ha_confs/6e7665ca-9b87-47a8-a570-b6bc999060fb

可以看到,ha port用于做心跳,其ip会包含在VRRP包中。virtual_ipaddress_excluded表示keepalived需要维护但是不会放在VRRP包中传输的IP。也就是说当某个router被选举为master的时候其会的将qg口设置对应的IP。virtual_routes在某个router设置为master的时候其会的设置对应的route。相反的,如果一个router变成了backup,那么上面这些操作会反过来做一遍。

现在来看看配置文件的改变是如何让keepalived感知的。当router的信息发生改变的时候,l3 agent会的获取到新的信息,根据这些信息会重新跑一遍上面的代码,于是配置文件会生成最新的。那么keepalived如何获取到新配置呢?keepalived支持HUP这个信号量,收到这个信号量后其会的主动的重新reload下配置。所以对于spawn方法,其在判定当前进程是alive的时候只要发送HUP信号量就行了:

    def enable(self, cmd_callback=None, reload_cfg=False):
        if not self.active:
            if not cmd_callback:
                cmd_callback = self.default_cmd_callback
            cmd = cmd_callback(self.get_pid_file_name())

            ip_wrapper = ip_lib.IPWrapper(namespace=self.namespace)
            ip_wrapper.netns.execute(cmd, addl_env=self.cmd_addl_env,
                                     run_as_root=self.run_as_root)
        elif reload_cfg:
            self.reload_cfg()

    def reload_cfg(self):
        self.disable('HUP')

    def disable(self, sig='9'):
        pid = self.pid

        if self.active:
            cmd = ['kill', '-%s' % (sig), pid]
            utils.execute(cmd, run_as_root=True)
            # In the case of shutting down, remove the pid file
            if sig == '9':
                fileutils.delete_if_exists(self.get_pid_file_name())
        elif pid:
            LOG.debug('Process for %(uuid)s pid %(pid)d is stale, ignoring '
                      'signal %(signal)s', {'uuid': self.uuid, 'pid': pid,
                                            'signal': sig})
        else:

Neutron中L3 HA的keepalived进程是两个一起出现的,一个是父进程,一个是VRRP协议的进程。这里发送HUP信号量都是发给父进程。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*