eventlet源码分析

eventlet是一个python实现的,基于greenlet和epoll或libevent的并发网络库。下面是一个简单的例子:

[root@RHEL01 tmp]# vi test.py
import eventlet
from eventlet.green import urllib2

def print_html_return(url):
    body = urllib2.urlopen(url).read()
    print('html body len is {0}'.format(len(body)))

gt = eventlet.spawn(print_html_return, 'http://www.taobao.com')
gt2 = eventlet.spawn(print_html_return, 'http://www.tmall.com')
gt3 = eventlet.spawn(print_html_return, 'http://www.alibaba.com')

gt.wait()
gt2.wait()
gt3.wait()

在这个例子里,我们会的去访问三个url,获取到返回的html body的长度。和普通的方法不同的是,这里我们使用了eventlet的urllib2模块,这个模块的被eventlet给“绿化”过,这个模块被“绿化”后其read方法就是非阻塞的方法了,代码不会的等待网址返回body而卡在那里,而是会的交出自己的时间片,通过协程(可以参考小秦的这个文章)调用其他准备好了可以执行的代码。

在这片文章中,小秦会分析eventlet中协程调度的代码、epoll在eventlet中的使用时机,以及“绿化”的含义。

就按照上面这个例子来跟踪代码吧,先来看spawn做了什么:

def spawn(func, *args, **kwargs):
    """Create a greenthread to run ``func(*args, **kwargs)``.  Returns a
    :class:`GreenThread` object which you can use to get the results of the
    call.

    Execution control returns immediately to the caller; the created greenthread
    is merely scheduled to be run at the next available opportunity.
    Use :func:`spawn_after` to  arrange for greenthreads to be spawned
    after a finite delay.
    """
    hub = hubs.get_hub()
    g = GreenThread(hub.greenlet)
    hub.schedule_call_global(0, g.switch, func, args, kwargs)
    return g

可以看到,这里首先要获取一个hub。hub是啥呢?简单的说,hub就是一个管理协程的工具,hub的实现其实是个loop的循环,循环的上半部分代码是执行可以被执行的任务,循环的下半部分代码是使用epoll这类方法去监听可以使用的fd。hub的代码我们下面会看到详细的分析,先看这里的get_hub:

def get_hub():
    """Get the current event hub singleton object.

    .. note :: |internal|
    """
    try:
        hub = _threadlocal.hub
    except AttributeError:
        try:
            _threadlocal.Hub
        except AttributeError:
            use_hub()
        hub = _threadlocal.hub = _threadlocal.Hub()
    return hub

_threadlocal是线程的本地变量:

_threadlocal = threading.local()

所以这里的意思就是先看看我们有没有已经建立了hub了,如果没有的话看看threadlocal.Hub这个类有没有,如果这个类也没有的话就先调用use_hub获取一个Hub类。由于我们才刚开始调用spawn,所以我们当然啥都没有啦,于是就进入到use_hub里了:

def use_hub(mod=None):
    """Use the module *mod*, containing a class called Hub, as the
    event hub. Usually not required; the default hub is usually fine.

    Mod can be an actual module, a string, or None.  If *mod* is a module,
    it uses it directly.   If *mod* is a string and contains either '.' or ':'
    use_hub tries to import the hub using the 'package.subpackage.module:Class'
    convention, otherwise use_hub looks for a matching setuptools entry point
    in the 'eventlet.hubs' group to load or finally tries to import
    `eventlet.hubs.mod` and use that as the hub module.  If *mod* is None,
    use_hub uses the default hub.  Only call use_hub during application
    initialization,  because it resets the hub's state and any existing
    timers or listeners will never be resumed.
    """
    if mod is None:
        mod = os.environ.get('EVENTLET_HUB', None)
    if mod is None:
        mod = get_default_hub()
    if hasattr(_threadlocal, 'hub'):
        del _threadlocal.hub
    if isinstance(mod, six.string_types):
        assert mod.strip(), "Need to specify a hub"
        if '.' in mod or ':' in mod:
            modulename, _, classname = mod.strip().partition(':')
            mod = __import__(modulename, globals(), locals(), [classname])
            if classname:
                mod = getattr(mod, classname)
        else:
            found = False
            if pkg_resources is not None:
                for entry in pkg_resources.iter_entry_points(
                        group='eventlet.hubs', name=mod):
                    mod, found = entry.load(), True
                    break
            if not found:
                mod = __import__(
                    'eventlet.hubs.' + mod, globals(), locals(), ['Hub'])
    if hasattr(mod, 'Hub'):
        _threadlocal.Hub = mod.Hub
    else:
        _threadlocal.Hub = mod

代码不长,功能很简单,就是import我们需要的Hub模块。在这里我们会的通过get_default_hub通过import来确定能用的hub,最终import eventlet.hubs.epolls这个模块。

ok,Hub类有了,然后就是建立Hub对象了。epolls的构造方法为:

class Hub(poll.Hub):
    def __init__(self, clock=time.time):
        BaseHub.__init__(self, clock)
        self.poll = epoll()
        try:
            # modify is required by select.epoll
            self.modify = self.poll.modify
        except AttributeError:
            self.modify = self.poll.register

可以看到他有一个poll的属性,这个属性就是以后用来epoll用的,下面会看到他的用处。另外这里父类的初始化方法也是很重要的:

class BaseHub(object):
    """ Base hub class for easing the implementation of subclasses that are
    specific to a particular underlying event architecture. """

    SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit)

    READ = READ
    WRITE = WRITE

    def __init__(self, clock=time.time):
        self.listeners = {READ: {}, WRITE: {}}
        self.secondaries = {READ: {}, WRITE: {}}
        self.closed = []

        self.clock = clock
        self.greenlet = greenlet.greenlet(self.run)
        self.stopping = False
        self.running = False
        self.timers = []
        self.next_timers = []
        self.lclass = FdListener
        self.timers_canceled = 0
        self.debug_exceptions = True
        self.debug_blocking = False
        self.debug_blocking_resolution = 1

这里的self.greenlet = greenlet.greenlet(self.run)就是协程调用的核心了,可以看到我们的hub就是一个协程,执行的代码是self.run。剧透下,当其他的通过类似spawn生成的方法阻塞然后释放时间片的时候,就会将时间片返回给hub,也就是返回给hub的self.run方法。因此这个self.run就是我们的核心调度器。

先不管hub,继续看我们的代码。获取了hub后,执行了下面的代码:

g = GreenThread(hub.greenlet)

其实GreenThread就是greenlet的一个封装:

class GreenThread(greenlet.greenlet):
    """The GreenThread class is a type of Greenlet which has the additional
    property of being able to retrieve the return value of the main function.
    Do not construct GreenThread objects directly; call :func:`spawn` to get one.
    """

    def __init__(self, parent):
        greenlet.greenlet.__init__(self, self.main, parent)
        self._exit_event = event.Event()
        self._resolving_links = False

可以看到,g = GreenThread(hub.greenlet)就是建立一个新的协程,也就是新建立一个greenlet,greenlet的父方法是我们的hub,也就是我们上面说的run。而自己这个协程运行的方法则是self.main:

    def main(self, function, args, kwargs):
        try:
            result = function(*args, **kwargs)
        except:
            self._exit_event.send_exception(*sys.exc_info())
            self._resolve_links()
            raise
        else:
            self._exit_event.send(result)
            self._resolve_links()

可以看到,如果这个协程的方法执行完毕,那么会发送一个result(或异常)给event,这个时候就能返回__main__了。
接着看代码:

hub.schedule_call_global(0, g.switch, func, args, kwargs)

这里的实现是:

    def schedule_call_global(self, seconds, cb, *args, **kw):
        """Schedule a callable to be called after 'seconds' seconds have
        elapsed. The timer will NOT be canceled if the current greenlet has
        exited before the timer fires.
            seconds: The number of seconds to wait.
            cb: The callable to call after the given time.
            *args: Arguments to pass to the callable when called.
            **kw: Keyword arguments to pass to the callable when called.
        """
        t = timer.Timer(seconds, cb, *args, **kw)
        self.add_timer(t)
        return t

这里生成了一个timer对象,然后通过add_time将spawn建立的协程加到了hub的next_timer中:

    def add_timer(self, timer):
        scheduled_time = self.clock() + timer.seconds
        self.next_timers.append((scheduled_time, timer))
        return scheduled_time

在这之后,就回到了我们例子的main中了。在执行了余下的两个spawn后,可以想象我们目前有的东西有一个hub,这个hub有一个next_timer对象,next_timer对象有三个timer,且timer中有g.switch这个greenlet的执行切入口方法。但这个时候我们还是没有开始执行我们定义的print_html_return,而且代码还是在我们的main中,hub的run方法也没有开始呢,因此我们继续往下看,看看wait方法的含义:

    def wait(self):
        """ Returns the result of the main function of this GreenThread.  If the
        result is a normal return value, :meth:`wait` returns it.  If it raised
        an exception, :meth:`wait` will raise the same exception (though the
        stack trace will unavoidably contain some frames from within the
        greenthread module)."""
        return self._exit_event.wait()

这里的_exit_event的wait方法是:

    def wait(self):
        """Wait until another coroutine calls :meth:`send`.
        Returns the value the other coroutine passed to
        :meth:`send`.

        >>> from eventlet import event
        >>> import eventlet
        >>> evt = event.Event()
        >>> def wait_on():
        ...    retval = evt.wait()
        ...    print("waited for {0}".format(retval))
        >>> _ = eventlet.spawn(wait_on)
        >>> evt.send('result')
        >>> eventlet.sleep(0)
        waited for result

        Returns immediately if the event has already
        occured.

        >>> evt.wait()
        'result'
        """
        current = greenlet.getcurrent()
        if self._result is NOT_USED:
            self._waiters.add(current)
            try:
                return hubs.get_hub().switch()
            finally:
                self._waiters.discard(current)
        if self._exc is not None:
            current.throw(*self._exc)
        return self._result

可以看到,g调用wait后,会将自己加入到自己的event对象的_waiters中,然后调用hub的switch,切换到hub中执行hub的run方法。从这里开始我们的hub开始真正运行起来。看下hub的switch吧:

    def switch(self):
        cur = greenlet.getcurrent()
        assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP'
        switch_out = getattr(cur, 'switch_out', None)
        if switch_out is not None:
            try:
                switch_out()
            except:
                self.squelch_generic_exception(sys.exc_info())
        self.ensure_greenlet()
        try:
            if self.greenlet.parent is not cur:
                cur.parent = self.greenlet
        except ValueError:
            pass  # gets raised if there is a greenlet parent cycle
        clear_sys_exc_info()
        return self.greenlet.switch()

做了一些检查后,最后的self.greenlet.switch()互换出了我们hub的run,来看下我们的run:

    def run(self, *a, **kw):
        """Run the runloop until abort is called.
        """
        # accept and discard variable arguments because they will be
        # supplied if other greenlets have run and exited before the
        # hub's greenlet gets a chance to run
        if self.running:
            raise RuntimeError("Already running!")
        try:
            self.running = True
            self.stopping = False
            while not self.stopping:
                while self.closed:
                    # We ditch all of these first.
                    self.close_one()
                self.prepare_timers()
                if self.debug_blocking:
                    self.block_detect_pre()
                self.fire_timers(self.clock())
                if self.debug_blocking:
                    self.block_detect_post()
                self.prepare_timers()
                wakeup_when = self.sleep_until()
                if wakeup_when is None:
                    sleep_time = self.default_sleep()
                else:
                    sleep_time = wakeup_when - self.clock()
                if sleep_time > 0:
                    self.wait(sleep_time)
                else:
                    self.wait(0)
            else:
                self.timers_canceled = 0
                del self.timers[:]
                del self.next_timers[:]
        finally:
            self.running = False
            self.stopping = False

代码很短,所以很美。来看下重要部分的实现:

self.prepare_timers()

timer其实就是我们之前spawn的时候注册的timer,prepare_timers的实现是:

    def prepare_timers(self):
        heappush = heapq.heappush
        t = self.timers
        for item in self.next_timers:
            if item[1].called:
                self.timers_canceled -= 1
            else:
                heappush(t, item)
        del self.next_timers[:]

这里会遍历我们的next_timers列表,将没有called的加入到一个heap中。然后清除我们的next_timers列表。继续看代码:

self.fire_timers(self.clock())
    def fire_timers(self, when):
        t = self.timers
        heappop = heapq.heappop

        while t:
            next = t[0]

            exp = next[0]
            timer = next[1]

            if when < exp:
                break

            heappop(t)

            try:
                if timer.called:
                    self.timers_canceled -= 1
                else:
                    timer()
            except self.SYSTEM_EXCEPTIONS:
                raise
            except:
                self.squelch_timer_exception(timer, sys.exc_info())
                clear_sys_exc_info()

这里会检查时间有时间上可以执行的timer,当有的时候调用timer()方法,其实现是:

    def __call__(self, *args):
        if not self.called:
            self.called = True
            cb, args, kw = self.tpl
            try:
                cb(*args, **kw)
            finally:
                try:
                    del self.tpl
                except AttributeError:
                    pass

可以看到,当调用timer的时候其实就是调用我们的print_html_return了。不过可能有人会问,如果只是这样的话,我们的异步呢?我们的epoll呢?所以为了代码能先继续看下去,小秦会先告诉大家,print_html_return中的urllib2.urlopen(url).read()在请求发出开始等待接收数据的时候,会的将自己的socket的读的fd注册到hub中(因为他被‘绿化’了),然后交出自己的时间片给hub。因此在这里我们可以知道,我们的三个timer都执行过了,并且都由于read请求而没有执行完,将自己的fd注册到了hub上,同时现在的时间片是交给了hub。因此我们先继续看代码:

                if wakeup_when is None:
                    sleep_time = self.default_sleep()
                else:
                    sleep_time = wakeup_when - self.clock()
                if sleep_time > 0:
                    self.wait(sleep_time)
                else:
                    self.wait(0)

wait是个很关键的方法,不通的hub其实最大的不同之处就是wait的实现不通。来看下poll hub的wait:

    def wait(self, seconds=None):
        readers = self.listeners[READ]
        writers = self.listeners[WRITE]

        if not readers and not writers:
            if seconds:
                sleep(seconds)
            return
        try:
            presult = self.do_poll(seconds)
        except (IOError, select.error) as e:
            if get_errno(e) == errno.EINTR:
                return
            raise
        SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS

        if self.debug_blocking:
            self.block_detect_pre()

        # Accumulate the listeners to call back to prior to
        # triggering any of them. This is to keep the set
        # of callbacks in sync with the events we've just
        # polled for. It prevents one handler from invalidating
        # another.
        callbacks = set()
        for fileno, event in presult:
            if event & READ_MASK:
                callbacks.add((readers.get(fileno, noop), fileno))
            if event & WRITE_MASK:
                callbacks.add((writers.get(fileno, noop), fileno))
            if event & select.POLLNVAL:
                self.remove_descriptor(fileno)
                continue
            if event & EXC_MASK:
                callbacks.add((readers.get(fileno, noop), fileno))
                callbacks.add((writers.get(fileno, noop), fileno))

        for listener, fileno in callbacks:
            try:
                listener.cb(fileno)
            except SYSTEM_EXCEPTIONS:
                raise
            except:
                self.squelch_exception(fileno, sys.exc_info())
                clear_sys_exc_info()

        if self.debug_blocking:
            self.block_detect_post()

按照小秦上面说的,print_html_return中的urllib2.urlopen(url).read()在请求发出开始等待接收数据的时候,会的将自己的socket的读的fd注册到hub中,因此下面两行能拿到我们在等待接收数据的fd:

        readers = self.listeners[READ]
        writers = self.listeners[WRITE]

然后就是do_poll啦,do_poll在epoll hub的实现就是普通的epoll:

    def do_poll(self, seconds):
        return self.poll.poll(seconds)

self.poll在上面提到过,就是一个python的epoll对象。在拿到我们可以使用的fd后,会的执行:

        for listener, fileno in callbacks:
            try:
                listener.cb(fileno)
            except SYSTEM_EXCEPTIONS:
                raise
            except:
                self.squelch_exception(fileno, sys.exc_info())
                clear_sys_exc_info()

不难猜测,我们的urllib2.urlopen(url).read()在交出时间片之前会注册回调函数以供回调。

ok,hub的基本流程基本已经清楚了,整理下就是:
1.通过spawn建立一个GreenThread,GreenThread将自己要执行的func包装在self.main中通过timer加入到next_timer中,然后通过event调用hub的run,同时等待event的返回。self.main在func执行后会做event的send操作,让代码转到__main__中。
2.hub的run是一个loop,loop显示查看next_timer中有没有要执行的协程,有的话就执行。所有要执行的timer都执行完后执行epoll之类的网络操作,用于处理spawn产生协程中func的需要等待fd的操作。

接下来我们来看‘绿化’,‘绿化’的目的是将python中原生的各种涉及网络fd的方法改为非阻塞的方法,自动的将fd加入epoll之类的调用中,而且过程对使用者透明。‘绿化’后,一个阻塞的fd调用会注册到hub上,然后该方法立马返回,将时间片交给hub,hub执行了所有的timer后统一在loop的下半部执行epoll,处理等待数据的fd,从而实现提高网络调用的性能。

来看下urllib2.urlopen(url).read():

from eventlet import patcher
from eventlet.green import ftplib
from eventlet.green import httplib
from eventlet.green import socket
from eventlet.green import time
from eventlet.green import urllib

patcher.inject(
    'urllib2',
    globals(),
    ('httplib', httplib),
    ('socket', socket),
    ('time', time),
    ('urllib', urllib))

FTPHandler.ftp_open = patcher.patch_function(FTPHandler.ftp_open, ('ftplib', ftplib))

del patcher

可以看到import httplib2的时候会执行pather的inject方法,用于替换python内置的模块为green的模块,inject的实现核心是:

    for name, mod in additional_modules:
        sys.modules[name] = mod

那么‘绿化’的模块是怎么和hub做到联动的呢,我们来看下‘绿化’的socket的recv方法:

    def recv(self, buflen, flags=0):
        fd = self.fd
        if self.act_non_blocking:
            return fd.recv(buflen, flags)
        while True:
            try:
                return fd.recv(buflen, flags)
            except socket.error as e:
                if get_errno(e) in SOCKET_BLOCKING:
                    pass
                elif get_errno(e) in SOCKET_CLOSED:
                    return ''
                else:
                    raise
            try:
                self._trampoline(
                    fd,
                    read=True,
                    timeout=self.gettimeout(),
                    timeout_exc=socket.timeout("timed out"))
            except IOClosed as e:
                # Perhaps we should return '' instead?
                raise EOFError()

可以看到,如果get_errno(e) in SOCKET_BLOCKING,那么就会的执行_trampoline,其会调用trampoline,后者的实现是:

def trampoline(fd, read=None, write=None, timeout=None,
               timeout_exc=timeout.Timeout,
               mark_as_closed=None):
    """Suspend the current coroutine until the given socket object or file
    descriptor is ready to *read*, ready to *write*, or the specified
    *timeout* elapses, depending on arguments specified.

    To wait for *fd* to be ready to read, pass *read* ``=True``; ready to
    write, pass *write* ``=True``. To specify a timeout, pass the *timeout*
    argument in seconds.

    If the specified *timeout* elapses before the socket is ready to read or
    write, *timeout_exc* will be raised instead of ``trampoline()``
    returning normally.

    .. note :: |internal|
    """
    t = None
    hub = get_hub()
    current = greenlet.getcurrent()
    assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
    assert not (
        read and write), 'not allowed to trampoline for reading and writing'
    try:
        fileno = fd.fileno()
    except AttributeError:
        fileno = fd
    if timeout is not None:
        def _timeout(exc):
            # This is only useful to insert debugging
            current.throw(exc)
        t = hub.schedule_call_global(timeout, _timeout, timeout_exc)
    try:
        if read:
            listener = hub.add(hub.READ, fileno, current.switch, current.throw, mark_as_closed)
        elif write:
            listener = hub.add(hub.WRITE, fileno, current.switch, current.throw, mark_as_closed)
        try:
            return hub.switch()
        finally:
            hub.remove(listener)
    finally:
        if t is not None:
            t.cancel()

这下就清楚啦,就是这里会将我们的fd注册到hub中,然后按照上面代码中分析的那样进行调度。当fd可用的时候,current.switch就会从当前代码继续下去,进入到recv后while一次回到上面就可以正常接收数据了。

最后总结下event的用处及spawn的调度流程:
1.spawn创建了一个GreenThread,包含了一个greenlet和一个event,greenlet的func是self.main,event中会注册下当前的greenlet到waiter中,也就是注册__main__。同时会的将GreenThread的switch注册到next_timer中。此时的环境还是__main__
2.GreenThread的wait方法是event的wait,其会的将时间片交给hub,此时环境是hub
3.hub在loop的时候,执行到GreenThread的switch,也就是调用GreenThread的next_timer,此时环境是GreenThread
4.GreenThread的self.main中会调用用户指定的func,然后执行event的send操作,此时环境是GreenThread
5.send操作会将_do_send注册到timer中,然后就结束了,环境返回hub
6.hub调度到_do_send,执行event所注册的waiter的switch,环境返回到__main__中

可以看到,event的作用非常大。正是有了event才能做到event.wait的非阻塞,同时得到需要的结果。event.wait本质上就是一个greenlet.switch+注册current_greenlet到waiter,event.send本质上就是调用waiter.switch

1 Response

  1. TylerTemp 2015年8月11日 / 上午6:50

    不错。抽时间再看一遍,看一次有些头大。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*