JUNO NEUTRON中的plugin和extension介绍及加载机制

这篇文章会理一下neutron中plugin和extension的加载机制。版本是最新的主干分支的代码。

首先先介绍点背景知识:

plugin主要用于提供neutron的核心功能,目前主要是提供network,subnet和port这三类资源(resource,为什么叫资源?应该是REST的原因吧)。plugin分两类:core_plugin和service_plugin。neutron虽然有很多vpnaas、lbaas之类的功能,但这些目前都不是core_plugin,而只是属于servie_plugin。目前core_plugin只是操作子网,管理子网网段以及操作端口。说的简单点core_plugin就是收到命令去和XXX-agent沟通,XXX-agent在交换机上执行具体的命令的一个plugin,功能偏向二层。平时常用的ml2就是一个core_plugin,用于和linbuxbridge或openvswitch交互来实现具体的网络管理。如果顺着ml2往下挖的话可以看到TypeManager、MechanismManager之类的东东,这些在官网或网上的很多文档里都有,但是对于理解plugin和extension其实关系不大,初次学习的人可以认为TypeManager、MechanismManager这类只是ml2这个core_plugin的一种实现方法,可以在对plugin和extension有了一定认识后在研究ml2的实现的时候再来看他们。

core_plugin按照标准,都是继承NeutronPluginBaseV2这个抽象类的。换句话说按照标准,实现一个core_plugin必须实现NeutronPluginBaseV2这个抽象类的所有方法。NeutronPluginBaseV2的主要method有:
create_(network/port/subnet)
delete_(network/port/subnet)
get_(network/networks/port/ports/subnet/subnets)
update_(network/port/subnet)
这些method基本上就能实现对交换机的常见操作了。

我们都知道,neutron对外提供的功能都是通过REST API来的,核心的REST API接口列表参见:http://developer.openstack.org/api-ref-networking-v2.html。比如port这个资源的操作,我可以通过GET/POST/DELETE等方法操作/v2.0/ports这个地址来进行管理。neutron是怎么把core_plugin的method和这些URL关联起来的呢?和openstack的其他组件一样,目前也是通过routes来关联的。routes以及其他组件的关联方法可以参见小秦之前的文章。neutron在这里有一点和其他组件不同,其它组件的地址是直接指定的,比如:

class Ec2Extension(wsgi.ExtensionRouter):
    def add_routes(self, mapper):
        ec2_controller = controllers.Ec2Controller()
        # validation
        mapper.connect(
            '/ec2tokens',
            controller=ec2_controller,
            action='authenticate',
            conditions=dict(method=['POST']))
 
        # crud
        mapper.connect(
            '/users/{user_id}/credentials/OS-EC2',
            controller=ec2_controller,
            action='create_credential',
            conditions=dict(method=['POST']))

这里的路径,如’/ec2tokens’、’/users/{user_id}/credentials/OS-EC2’是明确给出的。但是在neutron中则用了一些技巧。这点后面会分析。

core_plugin按照上面讲的,主要是实现二层的功能,那么service_plugin呢?从目前的代码来看,core_plugin应该是最早的neutron的那部分功能(如果理解有误请指出,多谢)。随着neutron的发展,lbaas、vpnaas这类功能慢慢的也出现了,这些功能现在也是通过一个一个plugin加入到neutron中。从下面的代码里可以看到,目前的neutron支持的plugin主要有:

# service type constants:
CORE = "CORE"
DUMMY = "DUMMY"
LOADBALANCER = "LOADBALANCER"
LOADBALANCERV2 = "LOADBALANCERV2"
FIREWALL = "FIREWALL"
VPN = "VPN"
METERING = "METERING"
L3_ROUTER_NAT = "L3_ROUTER_NAT"

而且,如果把neutorn看成由一个一个plugin组成的组件的话,core_plugin其实也可以看成service_plugin的一种,因此neutorn中有如下的代码:

        # core plugin as a part of plugin collection simplifies
        # checking extensions
        # TODO(enikanorov): make core plugin the same as
        # the rest of service plugins
        self.service_plugins = {constants.CORE: self.plugin}
        self._load_service_plugins()

从注释中可以看到,以后core_plugin最终也是会和service_plugin使用相同的方式维护的。只是目前或许由于历史原因,太多的组件依赖core_plugin,所以现在core_plugin和service_plugin在概念上还有一定区别。

简单介绍了plugin后,小秦来介绍下extension。一开始对plugin和extension的区别理解的不是很清楚,现在大概是清楚了。有三种类型的extension:
a.直接实现了某种新的resource。比如实现了资源XXX,那么我可以通过/v2.0/XXX进行XXX资源的操作。这个是最类似于实现一个新的service_plugin的
b.对现有的某种资源添加某种操作功能。比如对于ports资源,我想有一个动作是做绑定(打个比方,不一定确切),则可以通过extension在现有的plugin基础上增加功能,比如对/v2.0/ports增加/action接口
c.对现有的某个REST API请求增加参数。比如对于/v2.0/ports我本来创建的时候什么参数都不用提供,现在我希望POST请求能带上参数NAME,则可以通过extension来实现

从上面的功能小秦猜测,核心的API是稳定的,很少变动的,这类API由plugin实现。如果要有新的功能或特性,则先是通过extension实现。如果extension非常有用,或许有一天就会被并入plugin。不过在接下里的代码分析中,当看到如router、lbaas这类服务的URL与action映射关系生成的时候,可能大家就会有新的理解了。

ok,背景介绍完了,我们来回到正题。下面看下plugin和extension的加载机制,也就是plugin和extension是如何生成对应的wsgi app的。

开始看代码:
首先看paste-api.ini,内容为:

[composite:neutron]
use = egg:Paste#urlmap
/: neutronversions
/v2.0: neutronapi_v2_0

[composite:neutronapi_v2_0]
use = call:neutron.auth:pipeline_factory
noauth = request_id catch_errors extensions neutronapiapp_v2_0
keystone = request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0

[filter:request_id]
paste.filter_factory = oslo.middleware:RequestId.factory

[filter:catch_errors]
paste.filter_factory = oslo.middleware:CatchErrors.factory

[filter:keystonecontext]
paste.filter_factory = neutron.auth:NeutronKeystoneContext.factory

[filter:authtoken]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory

[filter:extensions]
paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory

[app:neutronversions]
paste.app_factory = neutron.api.versions:Versions.factory

[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory

这里neutron自己实现了一个use方法“call:neutron.auth:pipeline_factory”,主要用于选择是否走keystone认证。重点是最后的两个:extensions和neutronapiapp_v2_0。我们先看neutronapiapp_v2_0。

neutronapiapp_v2_0:
neutronapiapp_v2_0的实现是neutron.api.v2.router:APIRouter.factory,是个很普通的factory,返回自身生成的一个实例:

    @classmethod
    def factory(cls, global_config, **local_config):
        return cls(**local_config)

重点看这个实例,其构造函数很清楚的分为了几个部分,我们一行一行来看。首先:

        mapper = routes_mapper.Mapper()
        plugin = manager.NeutronManager.get_plugin()

第一行是获取一个mapper,这个mapper会用于构造所有neutronapiapp_v2_0这个app涉及到的url映射。plugin = manager.NeutronManager.get_plugin()这行代码加载了我们的plugin,所有的plugin。我们来看下其实现:

    @classmethod
    def get_instance(cls):
        # double checked locking
        if not cls.has_instance():
            cls._create_instance()
        return cls._instance

    @classmethod
    def get_plugin(cls):
        # Return a weakref to minimize gc-preventing references.
        return weakref.proxy(cls.get_instance().plugin)

这里其实是一个单例,重点是cls._create_instance()。为什么是单例的原因应该是不希望plugin加载多次。来看下cls._create_instance():

    @classmethod
    @utils.synchronized("manager")
    def _create_instance(cls):
        if not cls.has_instance():
            cls._instance = cls()

可以看到,这里的instance其实就是它自己的class的一个实例,class为:

class NeutronManager(object):
    """Neutron's Manager class.

    Neutron's Manager class is responsible for parsing a config file and
    instantiating the correct plugin that concretely implements
    neutron_plugin_base class.
    The caller should make sure that NeutronManager is a singleton.
    """

这个类的构造方法会加载所有的plugin,按照上面小秦给的说明,plugin是neutron真正的核心,所以这个类叫NeutronManager也是名副其实啦。看下plugin是怎么被加载的吧,简单的来说,主要就是通过stevedore根据配置文件查找entry_point来加载的,stevedore的说明可以参考小秦博客中的介绍。来看下NeutronManager的构造方法:

    def __init__(self, options=None, config_file=None):
        # If no options have been provided, create an empty dict
        if not options:
            options = {}

        msg = validate_pre_plugin_load()
        if msg:
            LOG.critical(msg)
            raise Exception(msg)

        # NOTE(jkoelker) Testing for the subclass with the __subclasshook__
        #                breaks tach monitoring. It has been removed
        #                intentionally to allow v2 plugins to be monitored
        #                for performance metrics.
        plugin_provider = cfg.CONF.core_plugin
        LOG.info(_LI("Loading core plugin: %s"), plugin_provider)
        self.plugin = self._get_plugin_instance(CORE_PLUGINS_NAMESPACE,
                                                plugin_provider)
        msg = validate_post_plugin_load()
        if msg:
            LOG.critical(msg)
            raise Exception(msg)

        # core plugin as a part of plugin collection simplifies
        # checking extensions
        # TODO(enikanorov): make core plugin the same as
        # the rest of service plugins
        self.service_plugins = {constants.CORE: self.plugin}
        self._load_service_plugins()

validate_pre_plugin_load主要是检查下配置文件中有没有指定core_plugin。如果指定了,那么_get_plugin_instance会的加载core_plugin,而_load_service_plugins则会加载service_plugin,这些plugin的可选的实现可以在setup.cfg中找到具体的实现类:

neutron.core_plugins =
    bigswitch = neutron.plugins.bigswitch.plugin:NeutronRestProxyV2
    brocade = neutron.plugins.brocade.NeutronPlugin:BrocadePluginV2
    cisco = neutron.plugins.cisco.network_plugin:PluginV2
    embrane = neutron.plugins.embrane.plugins.embrane_ml2_plugin:EmbraneMl2Plugin
    hyperv = neutron.plugins.hyperv.hyperv_neutron_plugin:HyperVNeutronPlugin
    ibm = neutron.plugins.ibm.sdnve_neutron_plugin:SdnvePluginV2
    midonet = neutron.plugins.midonet.plugin:MidonetPluginV2
    ml2 = neutron.plugins.ml2.plugin:Ml2Plugin
    mlnx = neutron.plugins.mlnx.mlnx_plugin:MellanoxEswitchPlugin
    nec = neutron.plugins.nec.nec_plugin:NECPluginV2
    nuage = neutron.plugins.nuage.plugin:NuagePlugin
    metaplugin = neutron.plugins.metaplugin.meta_neutron_plugin:MetaPluginV2
    oneconvergence = neutron.plugins.oneconvergence.plugin:OneConvergencePluginV2
    plumgrid = neutron.plugins.plumgrid.plumgrid_plugin.plumgrid_plugin:NeutronPluginPLUMgridV2
    ryu = neutron.plugins.ryu.ryu_neutron_plugin:RyuNeutronPluginV2
    vmware = neutron.plugins.vmware.plugin:NsxPlugin
neutron.service_plugins =
    dummy = neutron.tests.unit.dummy_plugin:DummyServicePlugin
    router = neutron.services.l3_router.l3_router_plugin:L3RouterPlugin
    bigswitch_l3 = neutron.plugins.bigswitch.l3_router_plugin:L3RestProxy
    firewall = neutron.services.firewall.fwaas_plugin:FirewallPlugin
    lbaas = neutron.services.loadbalancer.plugin:LoadBalancerPlugin
    vpnaas = neutron.services.vpn.plugin:VPNDriverPlugin
    metering = neutron.services.metering.metering_plugin:MeteringPlugin

加载的方法就是标准的stevedore的调用:

    def _get_plugin_instance(self, namespace, plugin_provider):
        try:
            # Try to resolve plugin by name
            mgr = driver.DriverManager(namespace, plugin_provider)
            plugin_class = mgr.driver
        except RuntimeError as e1:
            # fallback to class name
            try:
                plugin_class = importutils.import_class(plugin_provider)
            except ImportError as e2:
                LOG.exception(_LE("Error loading plugin by name, %s"), e1)
                LOG.exception(_LE("Error loading plugin by class, %s"), e2)
                raise ImportError(_("Plugin not found."))
        return plugin_class()

另外从代码self.service_plugins = {constants.CORE: self.plugin}以及_load_service_plugins下的self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst可以看到,所有的plugin(包括core_plugin和service_plugin)都会被当成service_plugin放到self.service_plugins中。

现在我们的plugin都加载好了,继续看NeutronManager的代码,下面两行是:

ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
        ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)

从形式上看,这个加载extension的方法和加载plugin的差不多,都是get_instance。我们看下get_instance方法:

    @classmethod
    def get_instance(cls):
        if cls._instance is None:
            cls._instance = cls(get_extensions_path(),
                                manager.NeutronManager.get_service_plugins())
        return cls._instance

get_extensions_path不多说了,就是为了获取extension文件的路径。get_service_plugins根据上面的分析,得到的是所有的plugin。真正这里干活的还是cls的构造方法,cls是PluginAwareExtensionManager,从名字里可以看到,这里的extension是感知到plugin的。来看下构造方法:

class PluginAwareExtensionManager(ExtensionManager):

    _instance = None

    def __init__(self, path, plugins):
        self.plugins = plugins
        super(PluginAwareExtensionManager, self).__init__(path)
        self.check_if_plugin_extensions_loaded()

真正的加载extension由其父类完成,代码是:

class ExtensionManager(object):
    """Load extensions from the configured extension path.

    See tests/unit/extensions/foxinsocks.py for an
    example extension implementation.
    """

    def __init__(self, path):
        LOG.info(_LI('Initializing extension manager.'))
        self.path = path
        self.extensions = {}
        self._load_all_extensions()

_load_all_extensions的重点代码是:

    def _load_all_extensions_from_path(self, path):
        # Sorting the extension list makes the order in which they
        # are loaded predictable across a cluster of load-balanced
        # Neutron Servers
        for f in sorted(os.listdir(path)):
            try:
                LOG.debug('Loading extension file: %s', f)
                mod_name, file_ext = os.path.splitext(os.path.split(f)[-1])
                ext_path = os.path.join(path, f)
                if file_ext.lower() == '.py' and not mod_name.startswith('_'):
                    mod = imp.load_source(mod_name, ext_path)
                    ext_name = mod_name[0].upper() + mod_name[1:]
                    new_ext_class = getattr(mod, ext_name, None)
                    if not new_ext_class:
                        LOG.warn(_LW('Did not find expected name '
                                     '"%(ext_name)s" in %(file)s'),
                                 {'ext_name': ext_name,
                                  'file': ext_path})
                        continue
                    new_ext = new_ext_class()
                    self.add_extension(new_ext)
            except Exception as exception:
                LOG.warn(_LW("Extension file %(f)s wasn't loaded due to "
                             "%(exception)s"),
                         {'f': f, 'exception': exception})

可以看到,其会在extension的目录搜索.py结尾的文件,然后动态加载。self.add_extension(new_ext)会对加载的extension做些检查:

    def _check_extension(self, extension):
        """Checks for required methods in extension objects."""
        try:
            LOG.debug('Ext name: %s', extension.get_name())
            LOG.debug('Ext alias: %s', extension.get_alias())
            LOG.debug('Ext description: %s', extension.get_description())
            LOG.debug('Ext namespace: %s', extension.get_namespace())
            LOG.debug('Ext updated: %s', extension.get_updated())
        except AttributeError as ex:
            LOG.exception(_LE("Exception loading extension: %s"), unicode(ex))
            return False
        return True

此时,extension就都加载好了。和plugin加载的区别是,plugin需要在配置文件中指定要加载什么plugin,而extension则只要放在目录中即可。

PluginAwareExtensionManager在这之后会做个检查。有些plugin需要依赖一些extension来完成一些功能(这点是我觉得不太能理解的,既然我的plugin是base,为啥plugin要依赖extension?不过目前看来就是如此)。plugin如果需要依赖某些extension的话,需要在对应的类中指定supported_extension_aliases属性,该属性是个列表,包含了需要依赖的extension,比如:

    # List of supported extensions
    _supported_extension_aliases = ["provider", "external-net", "binding",
                                    "quotas", "security-group", "agent",
                                    "dhcp_agent_scheduler",
                                    "multi-provider", "allowed-address-pairs",
                                    "extra_dhcp_opt"]

检查的方法其实就是把当前加载了的extension与plugin需要的extension做个比较,如果缺了就报错:

    def check_if_plugin_extensions_loaded(self):
        """Check if an extension supported by a plugin has been loaded."""
        plugin_extensions = set(itertools.chain.from_iterable([
            getattr(plugin, "supported_extension_aliases", [])
            for plugin in self.plugins.values()]))
        missing_aliases = plugin_extensions - set(self.extensions)
        if missing_aliases:
            raise exceptions.ExtensionsNotFound(
                extensions=list(missing_aliases))

ok,ext_mgr = extensions.PluginAwareExtensionManager.get_instance()已经加载了所有的extension,我们看下ext_mgr.extend_resources(“2.0”, attributes.RESOURCE_ATTRIBUTE_MAP)又做了什么:

    def extend_resources(self, version, attr_map):
        """Extend resources with additional resources or attributes.

        :param: attr_map, the existing mapping from resource name to
        attrs definition.

        After this function, we will extend the attr_map if an extension
        wants to extend this map.
        """
        update_exts = []
        processed_exts = set()
        exts_to_process = self.extensions.copy()
        # Iterate until there are unprocessed extensions or if no progress
        # is made in a whole iteration
        while exts_to_process:
            processed_ext_count = len(processed_exts)
            for ext_name, ext in exts_to_process.items():
                if not hasattr(ext, 'get_extended_resources'):
                    del exts_to_process[ext_name]
                    continue
                if hasattr(ext, 'update_attributes_map'):
                    update_exts.append(ext)
                if hasattr(ext, 'get_required_extensions'):
                    # Process extension only if all required extensions
                    # have been processed already
                    required_exts_set = set(ext.get_required_extensions())
                    if required_exts_set - processed_exts:
                        continue
                try:
                    extended_attrs = ext.get_extended_resources(version)
                    for resource, resource_attrs in extended_attrs.iteritems():
                        if attr_map.get(resource, None):
                            attr_map[resource].update(resource_attrs)
                        else:
                            attr_map[resource] = resource_attrs
                except AttributeError:
                    LOG.exception(_LE("Error fetching extended attributes for "
                                      "extension '%s'"), ext.get_name())
                processed_exts.add(ext_name)
                del exts_to_process[ext_name]
            if len(processed_exts) == processed_ext_count:
                # Exit loop as no progress was made
                break
        if exts_to_process:
            # NOTE(salv-orlando): Consider whether this error should be fatal
            LOG.error(_LE("It was impossible to process the following "
                          "extensions: %s because of missing requirements."),
                      ','.join(exts_to_process.keys()))

        # Extending extensions' attributes map.
        for ext in update_exts:
            ext.update_attributes_map(attr_map)

小秦上面说过,extension主要有三个作用,包括增加新的资源或者对现有资源的请求参数增加新的内容,这里就是做这个事情了。比如:

                    extended_attrs = ext.get_extended_resources(version)
                    for resource, resource_attrs in extended_attrs.iteritems():
                        if attr_map.get(resource, None):
                            attr_map[resource].update(resource_attrs)
                        else:
                            attr_map[resource] = resource_attrs

attr_map是plugin的资源属性,如果某个extension会对属性扩充会增加新资源,那么就会提供get_extended_resources方法。

对于RESOURCE_ATTRIBUTE_MAP,我们下面会看到其作用。

ok,现在extension和plugin都加载好了,extension也对pluging原生的RESOURCE_ATTRIBUTE_MAP增加了新的属性。那么接下来就是要映射url和具体的controller action的关系了。我们看代码:

mapper.connect('index', '/', controller=Index(RESOURCES))

对于/,controller是一个Index的controller,其主要是提供resource的索引服务(类似于给个目录,REST希望有这么个东西):

class Index(wsgi.Application):
    def __init__(self, resources):
        self.resources = resources

    @webob.dec.wsgify(RequestClass=wsgi.Request)
    def __call__(self, req):
        metadata = {}

        layout = []
        for name, collection in self.resources.iteritems():
            href = urlparse.urljoin(req.path_url, collection)
            resource = {'name': name,
                        'collection': collection,
                        'links': [{'rel': 'self',
                                   'href': href}]}
            layout.append(resource)
        response = dict(resources=layout)
        content_type = req.best_match_content_type()
        body = wsgi.Serializer(metadata=metadata).serialize(response,
                                                            content_type)
        return webob.Response(body=body, content_type=content_type)

实现很简单啦,主要就是字符串拼接成具体的url后返回一个body。

接着看代码:

        for resource in RESOURCES:
            _map_resource(RESOURCES[resource], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              RESOURCES[resource], dict()))

这里的RESOURCES指的是核心的resource,即:

RESOURCES = {'network': 'networks',
             'subnet': 'subnets',
             'port': 'ports'}

_map_resource的实现是:

        def _map_resource(collection, resource, params, parent=None):
            allow_bulk = cfg.CONF.allow_bulk
            allow_pagination = cfg.CONF.allow_pagination
            allow_sorting = cfg.CONF.allow_sorting
            controller = base.create_resource(
                collection, resource, plugin, params, allow_bulk=allow_bulk,
                parent=parent, allow_pagination=allow_pagination,
                allow_sorting=allow_sorting)
            path_prefix = None
            if parent:
                path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
                                                  parent['member_name'],
                                                  collection)
            mapper_kwargs = dict(controller=controller,
                                 requirements=REQUIREMENTS,
                                 path_prefix=path_prefix,
                                 **col_kwargs)
            return mapper.collection(collection, resource,
                                     **mapper_kwargs)

核心代码是mapper.collection,mapper的collection会根据提供的参数自动的生成需要的REST映射。具体的可以参看小秦博客里routes的文章或者直接查看routes的官方文档。mapper.collection会自动的将collection、resource映射到不同的HTTP method,同时将action指定到controller的具体方法。比如官方文档的例子:

>>> map.collection("entries", "entry")
>>> print map
Route name   Methods Path                        Controller action
entries      GET     /entries{.format}           entry      index
create_entry POST    /entries{.format}           entry      create
new_entry    GET     /entries/new{.format}       entry      new
entry        GET     /entries/{id}{.format}      entry      show
update_entry PUT     /entries/{id}{.format}      entry      update
delete_entry DELETE  /entries/{id}{.format}      entry      delete
edit_entry   GET     /entries/{id}/edit{.format} entry      edit

所以在我们这边其含义就是说根据collection、resource自动生成具体的url映射关系。重点也就是controller的生成了。controller依靠base.create_resource生成,注意到这里将controller和resource看成是一个东西。我们来看下其实现:

def create_resource(collection, resource, plugin, params, allow_bulk=False,
                    member_actions=None, parent=None, allow_pagination=False,
                    allow_sorting=False):
    controller = Controller(plugin, collection, resource, params, allow_bulk,
                            member_actions=member_actions, parent=parent,
                            allow_pagination=allow_pagination,
                            allow_sorting=allow_sorting)

    return wsgi_resource.Resource(controller, FAULT_MAP)
class Controller(object):
    LIST = 'list'
    SHOW = 'show'
    CREATE = 'create'
    UPDATE = 'update'
    DELETE = 'delete'

    def __init__(self, plugin, collection, resource, attr_info,
                 allow_bulk=False, member_actions=None, parent=None,
                 allow_pagination=False, allow_sorting=False):
        if member_actions is None:
            member_actions = []
        self._plugin = plugin
        self._collection = collection.replace('-', '_')
        self._resource = resource.replace('-', '_')
        self._attr_info = attr_info
        self._allow_bulk = allow_bulk
        self._allow_pagination = allow_pagination
        self._allow_sorting = allow_sorting
        self._native_bulk = self._is_native_bulk_supported()
        self._native_pagination = self._is_native_pagination_supported()
        self._native_sorting = self._is_native_sorting_supported()
        self._policy_attrs = [name for (name, info) in self._attr_info.items()
                              if info.get('required_by_policy')]
        self._notifier = n_rpc.get_notifier('network')
        # use plugin's dhcp notifier, if this is already instantiated
        agent_notifiers = getattr(plugin, 'agent_notifiers', {})
        self._dhcp_agent_notifier = (
            agent_notifiers.get(const.AGENT_TYPE_DHCP) or
            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
        )
        if cfg.CONF.notify_nova_on_port_data_changes:
            from neutron.notifiers import nova
            self._nova_notifier = nova.Notifier()
        self._member_actions = member_actions
        self._primary_key = self._get_primary_key()
        if self._allow_pagination and self._native_pagination:
            # Native pagination need native sorting support
            if not self._native_sorting:
                raise exceptions.Invalid(
                    _("Native pagination depend on native sorting")
                )
            if not self._allow_sorting:
                LOG.info(_LI("Allow sorting is enabled because native "
                             "pagination requires native sorting"))
                self._allow_sorting = True

        if parent:
            self._parent_id_name = '%s_id' % parent['member_name']
            parent_part = '_%s' % parent['member_name']
        else:
            self._parent_id_name = None
            parent_part = ''
        self._plugin_handlers = {
            self.LIST: 'get%s_%s' % (parent_part, self._collection),
            self.SHOW: 'get%s_%s' % (parent_part, self._resource)
        }
        for action in [self.CREATE, self.UPDATE, self.DELETE]:
            self._plugin_handlers[action] = '%s%s_%s' % (action, parent_part,
                                                         self._resource)

wsgi_resource.Resource其实就是对controller做了个包装,所以干活的还是controller。controller的关键代码如上,注意下面的几行:

        self._plugin_handlers = {
            self.LIST: 'get%s_%s' % (parent_part, self._collection),
            self.SHOW: 'get%s_%s' % (parent_part, self._resource)
        }
        for action in [self.CREATE, self.UPDATE, self.DELETE]:
            self._plugin_handlers[action] = '%s%s_%s' % (action, parent_part,
                                                         self._resource)

比如对于port这个资源,collection就是ports,resource就是port,因此其self._plugin_handlers的self.LIST就是get_ports,self.show就是get_port,这样就将controller和具体的plugin的方法关联起来了。同时从上面mapper.collection可以看出,url的名字是从collection和resource得到的,也就是会得到ports相关的url。action是固定的,所以我们的controller会有对应的index、create、new等方法。因此可以看出,controller是一个通用的controller,根据resource的不同,其会在mapper下生成不同的资源相关的url,并且构造方法中的self._plugin_handlers会关联资源相关的method。可以猜测,对于controller的那些index、create等方法,其最终会调用self._plugin_handlers,根据action的不同调用不同的method,而这些method则是和resource的名字相关的,进而url和具体的plugin的action就串联起来了。

这里适合休息一下,因为到此为止我们看到了core_plugin的url和具体实现的映射。如果有疑问的可以在这边先消化下。与RESOURCES类似的还有一个SUB_RESOURCES的映射关系构造,代码是一样的。

现在neutronapiapp_v2_0就看完了,我们来看下plugin_aware_extension_middleware_factory。

plugin_aware_extension_middleware_factory:
上面的分析将核心的REST API生成过程给分析好了。这里来看扩展的。入口代码如下:

def plugin_aware_extension_middleware_factory(global_config, **local_config):
    """Paste factory."""
    def _factory(app):
        ext_mgr = PluginAwareExtensionManager.get_instance()
        return ExtensionMiddleware(app, ext_mgr=ext_mgr)
    return _factory

PluginAwareExtensionManager.get_instance()不多说了,从上面的分析可以知道这里拿到的是一个拥有extension和plugin的manager对象。来看下ExtensionMiddleware:

class ExtensionMiddleware(wsgi.Middleware):
    """Extensions middleware for WSGI."""

    def __init__(self, application,
                 ext_mgr=None):
        self.ext_mgr = (ext_mgr
                        or ExtensionManager(get_extensions_path()))
        mapper = routes.Mapper()

        # extended resources
        for resource in self.ext_mgr.get_resources():
            path_prefix = resource.path_prefix
            if resource.parent:
                path_prefix = (resource.path_prefix +
                               "/%s/{%s_id}" %
                               (resource.parent["collection_name"],
                                resource.parent["member_name"]))

            LOG.debug('Extended resource: %s',
                      resource.collection)
            for action, method in resource.collection_actions.iteritems():
                conditions = dict(method=[method])
                path = "/%s/%s" % (resource.collection, action)
                with mapper.submapper(controller=resource.controller,
                                      action=action,
                                      path_prefix=path_prefix,
                                      conditions=conditions) as submap:
                    submap.connect(path)
                    submap.connect("%s.:(format)" % path)

            mapper.resource(resource.collection, resource.collection,
                            controller=resource.controller,
                            member=resource.member_actions,
                            parent_resource=resource.parent,
                            path_prefix=path_prefix)

        # extended actions
        action_controllers = self._action_ext_controllers(application,
                                                          self.ext_mgr, mapper)
        for action in self.ext_mgr.get_actions():
            LOG.debug('Extended action: %s', action.action_name)
            controller = action_controllers[action.collection]
            controller.add_action(action.action_name, action.handler)

        # extended requests
        req_controllers = self._request_ext_controllers(application,
                                                        self.ext_mgr, mapper)
        for request_ext in self.ext_mgr.get_request_extensions():
            LOG.debug('Extended request: %s', request_ext.key)
            controller = req_controllers[request_ext.key]
            controller.add_handler(request_ext.handler)

        self._router = routes.middleware.RoutesMiddleware(self._dispatch,
                                                          mapper)
        super(ExtensionMiddleware, self).__init__(application)

大致扫一下可以看到mapper、router啥的,大概也就知道这里肯定是做url的映射。和之前一样,我们一行一行看过来。这里再次复习下extension的三个功能:
a.直接实现了某种新的resource。比如实现了资源XXX,那么我可以通过/v2.0/XXX进行XXX资源的操作。这个是最类似于实现一个新的service_plugin的
b.对现有的某种资源添加某种操作功能。比如对于ports资源,我想有一个动作是做绑定(打个比方,不一定确切),则可以通过extension在现有的plugin基础上增加功能,比如对/v2.0/ports增加/action接口
c.对现有的某个REST API请求增加参数。比如对于/v2.0/ports我本来创建的时候什么参数都不用提供,现在我希望POST请求能带上参数NAME,则可以通过extension来实现

对于a和b,我们都是要提供url的,所以这里应该就会做这个事情。从注释中也可看到,先是实现‘extended resources’,然后是‘extended actions’,最后是‘extended requests’。

来看代码吧,首先:

for resource in self.ext_mgr.get_resources():

重点看下self.ext_mgr.get_resources():

    def get_resources(self):
        """Returns a list of ResourceExtension objects."""
        resources = []
        resources.append(ResourceExtension('extensions',
                                           ExtensionController(self)))
        for ext in self.extensions.itervalues():
            try:
                resources.extend(ext.get_resources())
            except AttributeError:
                # NOTE(dprince): Extension aren't required to have resource
                # extensions
                pass
        return resources

ResourceExtension代表一个资源:

class ResourceExtension(object):
    """Add top level resources to the OpenStack API in Neutron."""

    def __init__(self, collection, controller, parent=None, path_prefix="",
                 collection_actions={}, member_actions={}, attr_map={}):
        self.collection = collection
        self.controller = controller
        self.parent = parent
        self.collection_actions = collection_actions
        self.member_actions = member_actions
        self.path_prefix = path_prefix
        self.attr_map = attr_map

这里的每项的含义是:
self.collection:资源复数的名字,比如ports、networks。在REST API中,url一般是复数的,比如GET /v2.0/ports,复数的GET一般是通过filter来获取信息,而单数的GET,如/v2.0/ports/{id}则是通过id号获取
self.controller:资源代表的controller,用于url和action的映射
self.parent:父资源
self.collection_actions:复数资源允许的REST API操作,比如ports允许哪些操作(SHOW、INDEX这类,可以看下上面的map.collection)
self.member_actions:单个资源允许的REST API操作,比如port允许哪些操作,在URL中一般是包含{id}的
self.path_prefix:url前缀
self.attr_map:REST API参数属性信息

可以看到,一个ResourceExtension如果有controller的话,就能实现对应的url。回过去看get_resources方法:

        resources.append(ResourceExtension('extensions',
                                           ExtensionController(self)))

这里其实就是实现/v2.0/extensions接口的映射。ExtensionController的实现是:

class ExtensionController(wsgi.Controller):

    def __init__(self, extension_manager):
        self.extension_manager = extension_manager

    def _translate(self, ext):
        ext_data = {}
        ext_data['name'] = ext.get_name()
        ext_data['alias'] = ext.get_alias()
        ext_data['description'] = ext.get_description()
        ext_data['namespace'] = ext.get_namespace()
        ext_data['updated'] = ext.get_updated()
        ext_data['links'] = []  # TODO(dprince): implement extension links
        return ext_data

    def index(self, request):
        extensions = []
        for _alias, ext in self.extension_manager.extensions.iteritems():
            extensions.append(self._translate(ext))
        return dict(extensions=extensions)

    def show(self, request, id):
        # NOTE(dprince): the extensions alias is used as the 'id' for show
        ext = self.extension_manager.extensions.get(id, None)
        if not ext:
            raise webob.exc.HTTPNotFound(
                _("Extension with alias %s does not exist") % id)
        return dict(extension=self._translate(ext))

    def delete(self, request, id):
        msg = _('Resource not found.')
        raise webob.exc.HTTPNotFound(msg)

    def create(self, request):
        msg = _('Resource not found.')
        raise webob.exc.HTTPNotFound(msg)

可以看到,通过self.extension_manager.extensions.iteritems()可以获取extension的列表信息。

剩下的代码是对每个ext调用get_resource方法。对于那些只是扩充REST API HTTP参数的extension,get_resource会返回空,否则会返回一个ResourceExtension对象,包含具体的controller用于url的映射。对于这类extension,下面的代码会扩充URL:

            mapper.resource(resource.collection, resource.collection,
                            controller=resource.controller,
                            member=resource.member_actions,
                            parent_resource=resource.parent,
                            path_prefix=path_prefix)

比如securitygroup,根据官网extension的API列表可以看到其是有独立的url地址的,其get_resource的实现是:

    @classmethod
    def get_resources(cls):
        """Returns Ext Resources."""
        my_plurals = [(key, key[:-1]) for key in RESOURCE_ATTRIBUTE_MAP.keys()]
        attr.PLURALS.update(dict(my_plurals))
        exts = []
        plugin = manager.NeutronManager.get_plugin()
        for resource_name in ['security_group', 'security_group_rule']:
            collection_name = resource_name.replace('_', '-') + "s"
            params = RESOURCE_ATTRIBUTE_MAP.get(resource_name + "s", dict())
            quota.QUOTAS.register_resource_by_name(resource_name)
            controller = base.create_resource(collection_name,
                                              resource_name,
                                              plugin, params, allow_bulk=True,
                                              allow_pagination=True,
                                              allow_sorting=True)

            ex = extensions.ResourceExtension(collection_name,
                                              controller,
                                              attr_map=params)
            exts.append(ex)

        return exts

也就是会得到一个拥有controller的extensions.ResourceExtension对象。

接下来看action的扩充:

        # extended actions
        action_controllers = self._action_ext_controllers(application,
                                                          self.ext_mgr, mapper)
        for action in self.ext_mgr.get_actions():
            LOG.debug('Extended action: %s', action.action_name)
            controller = action_controllers[action.collection]
            controller.add_action(action.action_name, action.handler)

首先是_action_ext_controllers:

    def _action_ext_controllers(self, application, ext_mgr, mapper):
        """Return a dict of ActionExtensionController-s by collection."""
        action_controllers = {}
        for action in ext_mgr.get_actions():
            if action.collection not in action_controllers.keys():
                controller = ActionExtensionController(application)
                mapper.connect("/%s/:(id)/action.:(format)" %
                               action.collection,
                               action='action',
                               controller=controller,
                               conditions=dict(method=['POST']))
                mapper.connect("/%s/:(id)/action" % action.collection,
                               action='action',
                               controller=controller,
                               conditions=dict(method=['POST']))
                action_controllers[action.collection] = controller

        return action_controllers

ext_mgr.get_actions类似上面分析的ext_mgr.get_resources,其会调用每个extension的get_action方法。如果某个extension是扩展ports的url,那么action.collections就是ports,于是就会简历一个对应的controller。这个url对应了/port/{id}/action和/port/{id}/action.XXX两个url。看下这个controller:

class ActionExtensionController(wsgi.Controller):

    def __init__(self, application):
        self.application = application
        self.action_handlers = {}

    def add_action(self, action_name, handler):
        self.action_handlers[action_name] = handler

    def action(self, request, id):
        input_dict = self._deserialize(request.body,
                                       request.get_content_type())
        for action_name, handler in self.action_handlers.iteritems():
            if action_name in input_dict:
                return handler(input_dict, request, id)
        # no action handler found (bump to downstream application)
        response = self.application
        return response

可以看到,这个controller的action方法会的根据请求的参数调用具体的handler,也就是extension的具体方法。handler的注册如下:

        for action in self.ext_mgr.get_actions():
            LOG.debug('Extended action: %s', action.action_name)
            controller = action_controllers[action.collection]
            controller.add_action(action.action_name, action.handler)

最后来看extended requests的注册:

        # extended requests
        req_controllers = self._request_ext_controllers(application,
                                                        self.ext_mgr, mapper)
        for request_ext in self.ext_mgr.get_request_extensions():
            LOG.debug('Extended request: %s', request_ext.key)
            controller = req_controllers[request_ext.key]
            controller.add_handler(request_ext.handler)

基本和action是一样的。_request_ext_controllers的实现如下:

    def _request_ext_controllers(self, application, ext_mgr, mapper):
        """Returns a dict of RequestExtensionController-s by collection."""
        request_ext_controllers = {}
        for req_ext in ext_mgr.get_request_extensions():
            if req_ext.key not in request_ext_controllers.keys():
                controller = RequestExtensionController(application)
                mapper.connect(req_ext.url_route + '.:(format)',
                               action='process',
                               controller=controller,
                               conditions=req_ext.conditions)

                mapper.connect(req_ext.url_route,
                               action='process',
                               controller=controller,
                               conditions=req_ext.conditions)
                request_ext_controllers[req_ext.key] = controller

        return request_ext_controllers

其会调用每个extension的get_request_extensions方法获取req_ext。从RequestExtensionController可以看出其处理逻辑:

class RequestExtensionController(wsgi.Controller):

    def __init__(self, application):
        self.application = application
        self.handlers = []

    def add_handler(self, handler):
        self.handlers.append(handler)

    def process(self, request, *args, **kwargs):
        res = request.get_response(self.application)
        # currently request handlers are un-ordered
        for handler in self.handlers:
            response = handler(request, res)
        return response

可以看到,对于指定的要扩充参数的url,其会绑定到自己的process方法,process方法会先调用self.application做具体的事情,然后对于结果和请求调用handler做额外的处理,然后返回结果。

handler的注册则和上面的action extension是一样的。

ok,又可以休息下啦,至此我们看到了三种extension类型的注册方法,总结下就是:
a.对于资源扩充的,extension需要实现get_resource方法,该方法要返回一个拥有controller的ResourceExtension对象。mapper.resource会将其并入到url中
b.对于增加action的,extension需要实现get_action方法,注册extension的时候会的给对应的url增加/action的接口,该接口会调用具体的extension的实现来执行具体的代码
c.对于增加request参数的,extension需要实现get_request_extension方法,此时对应的url其实会绑定到新的controller的process方法上,只是这个process方法会先调用正常的process代码,然后对response和request做加工,最后返回真正的response

看到现在可能会有个疑问,对于core_plugin和extensions目录下的文件,我都加载了,那么对于其他如lbaas、fwaas之类的service_plugin的加载是在哪里加载的呢?其实对于这类服务,都是通过extension来做扩展的。如果我们看下extension的代码,根据上面我们知道,如果要增加新的resource,需要实现get_resource方法。比如:

    @classmethod
    def get_resources(cls):
        """Returns Ext Resources."""
        plural_mappings = resource_helper.build_plural_mappings(
            {}, RESOURCE_ATTRIBUTE_MAP)
        plural_mappings['external_fixed_ips'] = 'external_fixed_ip'
        attr.PLURALS.update(plural_mappings)
        action_map = {'router': {'add_router_interface': 'PUT',
                                 'remove_router_interface': 'PUT'}}
        return resource_helper.build_resource_info(plural_mappings,
                                                   RESOURCE_ATTRIBUTE_MAP,
                                                   constants.L3_ROUTER_NAT,
                                                   action_map=action_map,
                                                   register_quota=True)

看下这里的resource_helper.build_resource_info,这边应该会生成对应的controller,其实现是:

def build_resource_info(plural_mappings, resource_map, which_service,
                        action_map=None, register_quota=False,
                        translate_name=False, allow_bulk=False):
    """Build resources for advanced services.

    Takes the resource information, and singular/plural mappings, and creates
    API resource objects for advanced services extensions. Will optionally
    translate underscores to dashes in resource names, register the resource,
    and accept action information for resources.

    :param plural_mappings: mappings between singular and plural forms
    :param resource_map: attribute map for the WSGI resources to create
    :param which_service: The name of the service for which the WSGI resources
                          are being created. This name will be used to pass
                          the appropriate plugin to the WSGI resource.
                          It can be set to None or "CORE" to create WSGI
                          resources for the core plugin
    :param action_map: custom resource actions
    :param register_quota: it can be set to True to register quotas for the
                           resource(s) being created
    :param translate_name: replaces underscores with dashes
    :param allow_bulk: True if bulk create are allowed
    """
    resources = []
    if not which_service:
        which_service = constants.CORE
    if action_map is None:
        action_map = {}
    if which_service != constants.CORE:
        plugin = manager.NeutronManager.get_service_plugins()[which_service]
    else:
        plugin = manager.NeutronManager.get_plugin()
    for collection_name in resource_map:
        resource_name = plural_mappings[collection_name]
        params = resource_map.get(collection_name, {})
        if translate_name:
            collection_name = collection_name.replace('_', '-')
        if register_quota:
            quota.QUOTAS.register_resource_by_name(resource_name)
        member_actions = action_map.get(resource_name, {})
        controller = base.create_resource(
            collection_name, resource_name, plugin, params,
            member_actions=member_actions,
            allow_bulk=allow_bulk,
            allow_pagination=cfg.CONF.allow_pagination,
            allow_sorting=cfg.CONF.allow_sorting)
        resource = extensions.ResourceExtension(
            collection_name,
            controller,
            path_prefix=constants.COMMON_PREFIXES[which_service],
            member_actions=member_actions,
            attr_map=params)
        resources.append(resource)
    return resources

重点是这里的plugin = manager.NeutronManager.get_service_plugins()[which_service],如果使用的不是core_plugin,则这边会调用service_plugin的各种方法实现我们的controller。具体的实现和core_plugin是一样的,根据mapper.collection会生成各种method的get/delete/update等方法。也就是说,真正干活的代码是service_plugin实现的,extension只是实现了URL到controller action的映射。或许以后core_plugin也会通过这种方式实现吧。

好了,这里可以总结下了。目前的plugin分为core和service两类,这两类以后或许会都称为servie plugin。service plugin有各种get/delete/update等真正干活的方法,但是呢没有和URL绑定映射关系。对于service_plugin,URL的映射关系由extension来实现。对于core_plugin则是代码里写死了RESOURCE的类型。不过对于这两类plugin的URL的映射绑定方法实质上是相同的,都是用了mapper.collection。extension分为三类,每一类都要实现具体的某个方法。所以,extension既然是和URL有关的,那么extension其实可以叫做API extension啦。

另外从上面的代码也可以看出如何实现一个extension以及实现core_plugin或service_plugin。接下来有时间会自己写一个然后分享下。

3 Responses

  1. MatheMatrix 2015年4月17日 / 下午6:07

    nice post 😀

    说起来,前几天我们刚发现了一个 bug 就是这里的:
    “attr_map[resource] = resource_attrs”
    如果 resource_attrs 是通过 generator 生成的,而且直接用等号会使多个 resource 指向到同一个对象,进而引发混乱

    • thuanqin 2015年4月21日 / 下午1:57

      一开始看了下觉得没问题,去launchpad上搜了下才明白是咋回事,好隐含的bug… =-=

  2. K.Yang 2016年9月23日 / 下午4:33

    Nice,API 这部分梳理的非常清晰

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*