gevent源码初探-wsgi例子解析
发布日期:2021-07-25 13:04:40 浏览次数:8 分类:技术文章

本文共 22778 字,大约阅读时间需要 75 分钟。

gevent源码分析

本文环境gevent-0.9.0。

gevent简介

gevent是Python的一个并发框架,以协程库greenlet为基础,基于libev的高性能IO复用机制,其中可以使用monkey是程序中运行的IO阻塞操作转化为gevent中对应的非阻塞操作,从而在减少对程序代码的侵入性的情况下,达到搞性能的处理。

gevent示例

由于gevent底层是基于greenlet来实现的协程,首先先看下基本的greenlet的使用,

from greenlet import greenletdef test1():    print(12)    gr2.switch()    print(34)def test2():    print(56)    gr1.switch()    print(78)gr1 = greenlet(test1)gr2 = greenlet(test2)gr1.switch()

对于需要使用协程执行的函数,需要使用greenlet进行包装,当包装gr1,gr2之后,调用gr1的switch方法,使之进入协程test1运行,当test1打印出12之后,gr2调用了switch函数,然后就跳转执行gr2协程此时就执行test2,当打印56之后,此时调用gr1.switch方法,此时就跳转回gr1继续从gr2离开的地方继续执行,此时就打印出34,此时就协程就直接执行完成,此时位于由于没有协程的执行切换,所以gr2是不会打印出78。

对greenlet有个基本认识后,此时需要提到的一点就是生成的greenlet可以指定一个parent,该parent就是当子协程执行完成dead时,就会跳转到parent中继续执行,有了这些概念之后,查看gevent-0.9.0中给的examples/wsgi.py的示例代码,

from gevent import wsgi, socketdef hello_world(env, start_response):    if env['PATH_INFO'] != '/':        start_response('404 Not Found', [('Content-Type', 'text/plain')])        return ['Not Found\r\n']    else:        start_response('200 OK', [('Content-Type', 'text/plain')])        return ["Hello World!\r\n"]wsgi.server(socket.tcp_listener(('', 8080)), hello_world)

接下来我们就根据该实例代码来浅析一下,gevent的执行过程。

gevent示例执行流程分析

socket.tcp_listenser函数分析

根据实例代码,首先查看gevent中的socket.tcp_listenser函数,

def tcp_listener(address, backlog=50):    """    Listen on the given (ip, port) *address* with a TCP socket.    Returns a socket object on which one should call ``accept()`` to    accept a connection on the newly bound socket.    Generally, the returned socket will be passed to ``tcp_server()``,    which accepts connections forever and spawns greenlets for    each incoming connection.    """    sock = GreenSocket()                                                # 创建GreenSocket    socket_bind_and_listen(sock, address, backlog=backlog)              # 绑定并监听端口    return sock                                                         # 返回连接

此时查看GreenSocket类,

class GreenSocket(object):    is_secure = False    timeout = None    def __init__(self, family_or_realsock=_socket.AF_INET, *args, **kwargs):        if isinstance(family_or_realsock, (int, long)):                         # 判断传入的是否是文件socket连接            fd = _original_socket(family_or_realsock, *args, **kwargs)        else:            fd = family_or_realsock                                             # 如果不是,则是一个文件描述符            assert not args, args            assert not kwargs, kwargs        set_nonblocking(fd)                                                     # 将该文件描述符设置成非阻塞        self.fd = fd        self._fileno = fd.fileno()                                              # 获取文件描述符值        self.recvbuffer = ''        self.closed = False        self.timeout = _socket.getdefaulttimeout()                              # 获取超时时间        # when client calls setblocking(0) or settimeout(0) the socket must        # act non-blocking         self.act_non_blocking = False                                           # 是否启动阻塞与非阻塞标志位

在初始化的时候,调用了set_nonblocking方法,

def set_nonblocking(fd):    try:        setblocking = fd.setblocking                                            # 如果是socket连接则有该属性    except AttributeError:        # This version of Python predates socket.setblocking()        import fcntl                                                            # 文件等就没有改属性        fileno = fd.fileno()        flags = fcntl.fcntl(fileno, fcntl.F_GETFL)        fcntl.fcntl(fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK)               # 直接修改文件的访问模式为非阻塞    else:        # socket supports setblocking()        setblocking(0)                                                          # 设置成非阻塞

当GreenSocket初始化之后,继续查看socket_bind_and_listen方法,

def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50):    set_reuse_addr(descriptor)                                          # 设置监听端口可以立马复用    descriptor.bind(addr)                                               # 监听端口    descriptor.listen(backlog)                                          # 设置监听队列数量    return descriptor

其中bind,listen都调用了GreenSocket实例的方法,调用时都调用了socket的原生的bind,listen方法。

此时socket.tcp_listener方法就执行完成了,此时socket已经监听了对应的地址和端口,并返回了GreenSocket实例,此时我们继续查看wsgi.server函数,

wsgi.server函数分析
def server(sock, site, log=None, environ=None, max_size=None, max_http_version=DEFAULT_MAX_HTTP_VERSION, protocol=HttpProtocol, server_event=None, minimum_chunk_size=None):    serv = Server(sock, sock.getsockname(), site, log, environ=None, max_http_version=max_http_version, protocol=protocol, minimum_chunk_size=minimum_chunk_size)  # 初始化server实例    if server_event is not None:                                                                    # 如果传入不为空则直接调用发送方法        server_event.send(serv)    if max_size is None:                                                                            # 如果没有传入值        max_size = DEFAULT_MAX_SIMULTANEOUS_REQUESTS                                                # 默认最大值为默认值    pool = Pool(size=max_size)                                                                      # 初始化pool    try:        host, port = sock.getsockname()                                                             # 获取连接的信息        port = ':%s' % (port, )        if sock.is_secure:            scheme = 'https'            if port == ':443':                port = ''        else:            scheme = 'http'            if port == ':80':                port = ''        print "(%s) wsgi starting up on %s://%s%s/" % (os.getpid(), scheme, host, port)        while True:                                                                                 # 循环接受连接请求            try:                try:                     client_socket = sock.accept()                                                   # 获取新的连接                except socket.error, e:                    if e[0] != errno.EPIPE and e[0] != errno.EBADF:                        raise                pool.execute_async(serv.process_request, client_socket)                             # 处理进入的连接            except KeyboardInterrupt:                print "wsgi exiting"                break    finally:        try:            sock.close()                                                                            # 程序结束后关闭连接        except socket.error, e:            if e[0] != errno.EPIPE:                traceback.print_exc()

其中Server的类的初始化过程如下,

class Server(BaseHTTPServer.HTTPServer):    def __init__(self, socket, address, app, log=None, environ=None, max_http_version=None, protocol=HttpProtocol, minimum_chunk_size=None):        self.outstanding_requests = 0           self.socket = socket                                        # 传入的socket        self.address = address                                      # 地址        if log:                                                     # 日志配置            self.log = log          else:            self.log = sys.stderr                                           self.app = app                                              # 传入的wsgi处理函数        self.environ = environ                                      # 传入的环境变量        self.max_http_version = max_http_version        self.protocol = protocol                                    # 处理请求的handler        self.pid = os.getpid()                                      # 获取pid        if minimum_chunk_size is not None:            protocol.minimum_chunk_size = minimum_chunk_size        # 设置块大小

可以看出Server继承自BaseHTTPServer.HTTPServer类,当实例化该类之后,就是实例化Pool,该类就是创建一个缓冲池,确保执行的任务当缓冲池空闲时就可以执行,大家有兴趣可以去查看具体实现,然后程序就会进入循环等待请求连接进来,即sock.accept(),此时调用的就是GreenSocket实例的accept函数,

def accept(self):    if self.act_non_blocking:                                               # 如果设置为True        return self.fd.accept()                                             # 直接接受请求并返回    fd = self.fd                                                            # 设置连接实例    while True:        res = socket_accept(fd)                                             # 接受请求        if res is not None:                                                 # 如果不为空            client, addr = res                                              # 获取连接            set_nonblocking(client)                                         # 设置非阻塞            return type(self)(client), addr                                 # 返回实例化GreenSocket实例        wait_reader(fd.fileno(), timeout=self.gettimeout(), timeout_exc=timeout) # 如果没有请求连接则注册读事件等待请求接入

其中socket_accept就是调用了传入fd的accept方法,当有请求连接进来的时候则返回实例化的GreenSocket,如果没有则注册读事件,调用wait_reader等待请求接入,

def wait_reader(fileno, timeout=-1, timeout_exc=TimeoutError):    evt = core.read(fileno, _wait_helper, timeout, (getcurrent(), timeout_exc))     # 向libevent中注册读事件,传入fd和注册回调方法和参数    try:         returned_ev = get_hub().switch()                                            # 切换到主parent中执行        assert evt is returned_ev, (evt, returned_ev)                       finally:        evt.cancel()                                                                # 当事件完成后取消该事件

假如此时还没有连接请求,此时就会调用wait_reader函数,其中先注册请求事件然后转入到主循环中执行,此时我们先看get_hub()函数,

def get_hub():    global _threadlocal    try:        hub = _threadlocal.hub                              # 检查该全局变量是否有hub属性    except AttributeError:        # do not import anything that can be monkey-patched at top level        import threading                                    # 没有的话,则设置成线程安全的值        _threadlocal = threading.local()        hub = _threadlocal.hub = Hub()                      # 设置为Hub类的实例    return hub

继续查看Hub类,

class Hub(object):    def __init__(self):        self.greenlet = Greenlet(self.run)                      # 将self.run包装成协程        self.keyboard_interrupt_signal = None    def switch(self):        cur = getcurrent()                                      # 获取当前协程        assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP'        switch_out = getattr(cur, 'switch_out', None)           # 如果当前协程有switch_out属性则直接执行该方法        if switch_out is not None:            try:                switch_out()            except:                traceback.print_exception(*sys.exc_info())        if self.greenlet.dead:                                  # 检查当前协程是否死亡            self.greenlet = Greenlet(self.run)                  # 如果死亡则重新初始化        return self.greenlet.switch()                           # 调用switch函数切换协程    def run(self, *args, **kwargs):        if self.keyboard_interrupt_signal is None:            self.keyboard_interrupt_signal = signal(2, MAIN.throw, KeyboardInterrupt)        while True:            result = core.dispatch()                            # 调用libevent的事件调度函数,去检查是否有注册的事件发生            if result>0:                return 'Hub.run() has finished because there are no events registered'            elif result<0:                return 'Hub.run() has finished because there was an error'            return result

由此可知,get_hub().switch()函数,就是调用了Hub类的switch函数,该函数就是调用执行了Hub类的run方法,来循环检查是否有注册事件发生,此时会一直循环等待直到有注册的时间发生并处理。

此时假如有请求连接进来,此时就会调用刚刚wait_reader中注册的_wait_helper函数,

def _wait_helper(ev, fd, evtype):    current, timeout_exc = ev.arg                       # 获取参数值    if evtype & core.EV_TIMEOUT:        current.throw(timeout_exc)                      # 如果超时则报错    else:        current.switch(ev)                              # 切换到传入的协程中继续执行并将ev返回

此时就切换回了位于GreenSocket实例的accept中的while True循环中继续执行,此时就通过socket_accept()接受接入的请求,此时就返回一个新的GreenSocket实例。

当接入新的请求后继续返回server函数中,执行

pool.execute_async(serv.process_request, client_socket)

此时查看Pool的该方法,

def execute_async(self, func, *args, **kwargs):    if self.sem.locked():        return spawn_greenlet(self.execute, func, *args, **kwargs)          # 等待处理    else:        return self.execute(func, *args, **kwargs)                          # 直接执行

此时继续查看self.execute函数,

def execute(self, func, *args, **kwargs):    """Execute func in one of the coroutines maintained    by the pool, when one is free.    Immediately returns a Proc object which can be queried    for the func's result.    >>> pool = Pool()    >>> task = pool.execute(lambda a: ('foo', a), 1)    >>> task.wait()    ('foo', 1)    """    # if reentering an empty pool, don't try to wait on a coroutine freeing    # itself -- instead, just execute in the current coroutine    if self.sem.locked() and gevent.getcurrent() in self.procs:         # 检查缓冲池中是否存在            p = spawn(func, *args, **kwargs)        try:            p.wait()        except:            pass    else:        self.sem.acquire()                                              # 缓冲池可以立马使用        p = self.procs.spawn(func, *args, **kwargs)                     # 生成协程并执行        # assuming the above line cannot raise        p.link(lambda p: self.sem.release())    return p

此时调用了spawn方法,

def spawn(self, func, *args, **kwargs):        p = spawn(func, *args, **kwargs)        # 生成协程        self.add(p)        return p

此时的spawn就是调用了,

def spawn(function, *args, **kwargs):    """Create a new greenlet that will run `function(*args)'.    The current greenlet won't be unscheduled. Keyword arguments aren't    supported (limitation of greenlet), use spawn() to work around that.    """    g = Greenlet(lambda : function(*args, **kwargs))        # 生成协程    g.parent = get_hub().greenlet                           # 设置生成协程的parent    timer(0, g.switch)                                      # 注册0秒后执行    return g                                                # 返回该协程

此时,马上就会被执行的就是pool.execute_async中传入的函数和参数,即serv.process_request和参数client_socket,

此时查看Server的process_request方法,

def process_request(self, (socket, address)):    proto = self.protocol(socket, address, self)    proto.handle()

此时的protocol就是默认传入的HttpProtocol,通过生成该类的实例,然后调用该实例的handle方法,HttpProtocol类继承自BaseHTTPRequestHandler,此时BaseRequestHandler的流程是,在初始化完成的时候就分别调用setup,handler和finish方法,初始化的时候就开始解析请求并处理请求,请求完成后就finish。

由于在setup时调用的是StreamRequestHandler的setup方法,该方法如下,

def setup(self):    self.connection = self.request    if self.timeout is not None:        self.connection.settimeout(self.timeout)    if self.disable_nagle_algorithm:        self.connection.setsockopt(socket.IPPROTO_TCP,                                   socket.TCP_NODELAY, True)    self.rfile = self.connection.makefile('rb', self.rbufsize)    self.wfile = self.connection.makefile('wb', self.wbufsize)

该方法中的connection就是接入请求时,初始化的GreenSocket实例,此时查看GreenSocket的makefile方法,

def makefile(self, mode='r', bufsize=-1):    return _socket._fileobject(self.dup(), mode, bufsize)

此时的调用了self.dup()方法,并使用了_fileobject类初始化,

def dup(self, *args, **kw):    sock = self.fd.dup(*args, **kw)             # 调用socket的dup方法    set_nonblocking(sock)                       # 设置非阻塞    newsock = type(self)(sock)                  # 生成GreenSocket实例    newsock.settimeout(self.timeout)            # 设置超时时间    return newsock                              # 返回实例

此时就分别设置了rfile和wfile,此时就调用了handle方法,该方法调用的是BaseHTTPRequestHandler的handler方法,

def handle(self):    """Handle multiple requests if necessary."""    self.close_connection = 1    self.handle_one_request()    while not self.close_connection:        self.handle_one_request()

由于HttpProtocol重写了handle_one_request方法,所以此时就调用了HttpProtocol的handle_one_request方法,

def handle_one_request(self):    if self.server.max_http_version:        self.protocol_version = self.server.max_http_version            # 设置http版本    if self.rfile.closed:        self.close_connection = 1                                       # 检查rfile是否关闭        return                                                          # 如果关闭则返回    try:        self.raw_requestline = self.rfile.readline(MAX_REQUEST_LINE)    # 从rfile中读数据        if len(self.raw_requestline) == MAX_REQUEST_LINE:               # 如果超过大小则直接写入返回            self.wfile.write(                "HTTP/1.0 414 Request URI Too Long\r\nConnection: close\r\nContent-length: 0\r\n\r\n")            self.close_connection = 1            return    except socket.error, e:        if e[0] != errno.EBADF:            raise        self.raw_requestline = ''    if not self.raw_requestline:                                        # 如果读入数据为空        self.close_connection = 1        return                                                          # 直接返回    if not self.parse_request():                                        # 解析request实例        return    self.environ = self.get_environ()                                   # 获取环境变量    self.application = self.server.app                                  # 获取app处理函数    try:        self.server.outstanding_requests += 1        try:            self.handle_one_response()                                  # 处理并返回        except socket.error, e:            # Broken pipe, connection reset by peer            if e[0] in (32, 54):                pass            else:                raise    finally:        self.server.outstanding_requests -= 1

此时,就需要通过self.rfile.readline(MAX_REQUEST_LINE)读取传入的数据,由于此时的rfile是由GreenSocket实例传入的,通过查找_objfile的readline方法,该方法中第一次执行会执行到 self._sock.recv(self._rbufsize)方法,此时的_sock就是在实例化是传入的GreenSocket,此时就调用了GreenSocket实例的recv方法,

recv = higher_order_recv(socket_recv)

其中higher_order_recv函数,如下,

def higher_order_recv(recv_func):    def recv(self, buflen):        if self.act_non_blocking:                                   # 是否是设置成阻塞模式            return self.fd.recv(buflen)                             # 直接读取数据        buf = self.recvbuffer                                       # 获取接受缓冲区        if buf:                                                     # 如果接受缓冲区有内容            chunk, self.recvbuffer = buf[:buflen], buf[buflen:]     # 获取指定大小的数据并返回            return chunk        fd = self.fd                                                # 如果接受缓冲区没有数据则需要读取数据        bytes = recv_func(fd, buflen)                               # 由于设置的是非阻塞模式,如果没有可读数据会返回为空        if self.gettimeout():                                       # 获取超时时间            end = time.time()+self.gettimeout()        else:            end = None                                              # 计算结束时间        timeout_seconds = None        while bytes is None:                                        # 如果读入的数据为空            try:                if end:                     timeout_seconds = end - time.time()             # 计算超时时间                wait_reader(fd.fileno(), timeout=timeout_seconds, timeout_exc=timeout) # 注册读事件并切换到parent运行            except timeout:                raise            except error, e:                if e[0] == errno.EPIPE:                    bytes = ''                else:                    raise            else:                bytes = recv_func(fd, buflen)                       # 当有读事件返回时读取数据        return bytes    return recv

socket_recv函数主要是进行相关错误的捕捉,

def socket_recv(descriptor, buflen):    try:        return descriptor.recv(buflen)    except error, e:        if e[0] == errno.EWOULDBLOCK:            return None        if e[0] in SOCKET_CLOSED:            return ''        raise    except SSL.WantReadError:        return None    except SSL.ZeroReturnError:        return ''    except SSL.SysCallError, e:        if e[0] == -1 or e[0] > 0:            return ''        raise

此时就注册了读事件并切换到parent中进行事件循环,等待连接的请求有数据可读。当有数据读入的时候从parent协程切换后继续执行,此时就执行到了HttpProtocol实例的handle_one_response方法,该方法主要就是处理具体的请求处理,

def handle_one_response(self):    start = time.time()    headers_set = []    headers_sent = []    # set of lowercase header names that were sent    header_dict = {}    wfile = self.wfile    result = None    use_chunked = [False]    length = [0]    status_code = [200]     ...     try:        try:            result = self.application(self.environ, start_response)                         # 调用具体的业务处理函数,例子中的hello_world            if not headers_sent and hasattr(result, '__len__'):                headers_set[1].append(('content-length', str(sum(map(len, result)))))            towrite = []            for data in result:                                                             # 将处理返回数据写入                if data:                    write(data)                                                             # 调用write函数写入data            if not headers_sent:                write('')            if use_chunked[0]:                wfile.write('0\r\n\r\n')        except Exception, e:            self.close_connection = 1            exc = traceback.format_exc()            print exc            if not headers_set:                start_response("500 Internal Server Error", [('Content-type', 'text/plain')])                write(exc)

此时就待业务处理完成后就调用了write函数处理返回数据,此时的write函数就位于hande_one_response中的write函数,

def write(data, _writelines=wfile.writelines):                      # 写入文件的方法             ...               try:            _writelines(towrite)                                        # 写入数据            length[0] = length[0] + sum(map(len, towrite))        except UnicodeEncodeError:            print "Encountered unicode while attempting to write wsgi response: ", [x for x in towrite if isinstance(x, unicode)]            traceback.print_exc()            _writelines(                ["HTTP/1.0 500 Internal Server Error\r\n",                "Connection: close\r\n",                "Content-type: text/plain\r\n",                "Content-length: 98\r\n",                "\r\n",                "Internal Server Error: wsgi application passed a unicode object to the server instead of a string."])

此时就调用了wfile.writelines此时对应到_fileobject中的writelines,该方法如下,

def writelines(self, list):    # XXX We could do better here for very long lists    # XXX Should really reject non-string non-buffers    lines = filter(None, map(str, list))    self._wbuf_len += sum(map(len, lines))    self._wbuf.extend(lines)    if (self._wbufsize <= 1 or        self._wbuf_len >= self._wbufsize):        self.flush()

调用了flush方法,

def flush(self):    if self._wbuf:        data = "".join(self._wbuf)        self._wbuf = []        self._wbuf_len = 0        buffer_size = max(self._rbufsize, self.default_bufsize)        data_size = len(data)        write_offset = 0        view = memoryview(data)        try:            while write_offset < data_size:                self._sock.sendall(view[write_offset:write_offset+buffer_size])                write_offset += buffer_size        finally:            if write_offset < data_size:                remainder = data[write_offset:]                del view, data  # explicit free                self._wbuf.append(remainder)                self._wbuf_len = len(remainder)

调用了self._sock.sendall方法,即调用了GreenSocket实例的sendall方法,

def sendall(self, data):    fd = self.fd    tail = self.send(data)                                  # 发送数据    while tail < len(data):                                 # 检查数据是否发送完成        wait_writer(self.fileno(), timeout_exc=timeout)     # 注册等待写入事件        tail += self.send(data[tail:])                      # 当有可写时切换回来之后就发送数据

此时就将处理完成的数据返回,至此一个请求完成的基于gevent的请求接受和处理数据的发送就处理完成。

gevent总结

gevent基于greenlet的协程的切换的执行方式,利用包装的GreenSocket类将数据的请求的时间进行了封装,达到了遇到阻塞事件时,将阻塞事件注册并切换到主parent中执行事件循环,待注册事件发生后,再切换回原协程继续执行,至此gevent的大致原理如上。

转载地址:https://blog.csdn.net/qq_33339479/article/details/81143775 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:python3.5全局解释器锁GIL-实现原理浅析
下一篇:flask源码学习-路由的注册与请求处理的过程

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2024年05月01日 21时53分23秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章