本文共 22778 字,大约阅读时间需要 75 分钟。
gevent源码分析
本文环境gevent-0.9.0。
gevent简介
gevent是Python的一个并发框架,以协程库greenlet为基础,基于libev的高性能IO复用机制,其中可以使用monkey是程序中运行的IO阻塞操作转化为gevent中对应的非阻塞操作,从而在减少对程序代码的侵入性的情况下,达到搞性能的处理。
gevent示例
由于gevent底层是基于greenlet来实现的协程,首先先看下基本的greenlet的使用,
from greenlet import greenletdef test1(): print(12) gr2.switch() print(34)def test2(): print(56) gr1.switch() print(78)gr1 = greenlet(test1)gr2 = greenlet(test2)gr1.switch()
对于需要使用协程执行的函数,需要使用greenlet进行包装,当包装gr1,gr2之后,调用gr1的switch方法,使之进入协程test1运行,当test1打印出12之后,gr2调用了switch函数,然后就跳转执行gr2协程此时就执行test2,当打印56之后,此时调用gr1.switch方法,此时就跳转回gr1继续从gr2离开的地方继续执行,此时就打印出34,此时就协程就直接执行完成,此时位于由于没有协程的执行切换,所以gr2是不会打印出78。
对greenlet有个基本认识后,此时需要提到的一点就是生成的greenlet可以指定一个parent,该parent就是当子协程执行完成dead时,就会跳转到parent中继续执行,有了这些概念之后,查看gevent-0.9.0中给的examples/wsgi.py的示例代码,
from gevent import wsgi, socketdef hello_world(env, start_response): if env['PATH_INFO'] != '/': start_response('404 Not Found', [('Content-Type', 'text/plain')]) return ['Not Found\r\n'] else: start_response('200 OK', [('Content-Type', 'text/plain')]) return ["Hello World!\r\n"]wsgi.server(socket.tcp_listener(('', 8080)), hello_world)
接下来我们就根据该实例代码来浅析一下,gevent的执行过程。
gevent示例执行流程分析
socket.tcp_listenser函数分析
根据实例代码,首先查看gevent中的socket.tcp_listenser函数,
def tcp_listener(address, backlog=50): """ Listen on the given (ip, port) *address* with a TCP socket. Returns a socket object on which one should call ``accept()`` to accept a connection on the newly bound socket. Generally, the returned socket will be passed to ``tcp_server()``, which accepts connections forever and spawns greenlets for each incoming connection. """ sock = GreenSocket() # 创建GreenSocket socket_bind_and_listen(sock, address, backlog=backlog) # 绑定并监听端口 return sock # 返回连接
此时查看GreenSocket类,
class GreenSocket(object): is_secure = False timeout = None def __init__(self, family_or_realsock=_socket.AF_INET, *args, **kwargs): if isinstance(family_or_realsock, (int, long)): # 判断传入的是否是文件socket连接 fd = _original_socket(family_or_realsock, *args, **kwargs) else: fd = family_or_realsock # 如果不是,则是一个文件描述符 assert not args, args assert not kwargs, kwargs set_nonblocking(fd) # 将该文件描述符设置成非阻塞 self.fd = fd self._fileno = fd.fileno() # 获取文件描述符值 self.recvbuffer = '' self.closed = False self.timeout = _socket.getdefaulttimeout() # 获取超时时间 # when client calls setblocking(0) or settimeout(0) the socket must # act non-blocking self.act_non_blocking = False # 是否启动阻塞与非阻塞标志位
在初始化的时候,调用了set_nonblocking方法,
def set_nonblocking(fd): try: setblocking = fd.setblocking # 如果是socket连接则有该属性 except AttributeError: # This version of Python predates socket.setblocking() import fcntl # 文件等就没有改属性 fileno = fd.fileno() flags = fcntl.fcntl(fileno, fcntl.F_GETFL) fcntl.fcntl(fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK) # 直接修改文件的访问模式为非阻塞 else: # socket supports setblocking() setblocking(0) # 设置成非阻塞
当GreenSocket初始化之后,继续查看socket_bind_and_listen方法,
def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50): set_reuse_addr(descriptor) # 设置监听端口可以立马复用 descriptor.bind(addr) # 监听端口 descriptor.listen(backlog) # 设置监听队列数量 return descriptor
其中bind,listen都调用了GreenSocket实例的方法,调用时都调用了socket的原生的bind,listen方法。
此时socket.tcp_listener方法就执行完成了,此时socket已经监听了对应的地址和端口,并返回了GreenSocket实例,此时我们继续查看wsgi.server函数,
wsgi.server函数分析
def server(sock, site, log=None, environ=None, max_size=None, max_http_version=DEFAULT_MAX_HTTP_VERSION, protocol=HttpProtocol, server_event=None, minimum_chunk_size=None): serv = Server(sock, sock.getsockname(), site, log, environ=None, max_http_version=max_http_version, protocol=protocol, minimum_chunk_size=minimum_chunk_size) # 初始化server实例 if server_event is not None: # 如果传入不为空则直接调用发送方法 server_event.send(serv) if max_size is None: # 如果没有传入值 max_size = DEFAULT_MAX_SIMULTANEOUS_REQUESTS # 默认最大值为默认值 pool = Pool(size=max_size) # 初始化pool try: host, port = sock.getsockname() # 获取连接的信息 port = ':%s' % (port, ) if sock.is_secure: scheme = 'https' if port == ':443': port = '' else: scheme = 'http' if port == ':80': port = '' print "(%s) wsgi starting up on %s://%s%s/" % (os.getpid(), scheme, host, port) while True: # 循环接受连接请求 try: try: client_socket = sock.accept() # 获取新的连接 except socket.error, e: if e[0] != errno.EPIPE and e[0] != errno.EBADF: raise pool.execute_async(serv.process_request, client_socket) # 处理进入的连接 except KeyboardInterrupt: print "wsgi exiting" break finally: try: sock.close() # 程序结束后关闭连接 except socket.error, e: if e[0] != errno.EPIPE: traceback.print_exc()
其中Server的类的初始化过程如下,
class Server(BaseHTTPServer.HTTPServer): def __init__(self, socket, address, app, log=None, environ=None, max_http_version=None, protocol=HttpProtocol, minimum_chunk_size=None): self.outstanding_requests = 0 self.socket = socket # 传入的socket self.address = address # 地址 if log: # 日志配置 self.log = log else: self.log = sys.stderr self.app = app # 传入的wsgi处理函数 self.environ = environ # 传入的环境变量 self.max_http_version = max_http_version self.protocol = protocol # 处理请求的handler self.pid = os.getpid() # 获取pid if minimum_chunk_size is not None: protocol.minimum_chunk_size = minimum_chunk_size # 设置块大小
可以看出Server继承自BaseHTTPServer.HTTPServer类,当实例化该类之后,就是实例化Pool,该类就是创建一个缓冲池,确保执行的任务当缓冲池空闲时就可以执行,大家有兴趣可以去查看具体实现,然后程序就会进入循环等待请求连接进来,即sock.accept(),此时调用的就是GreenSocket实例的accept函数,
def accept(self): if self.act_non_blocking: # 如果设置为True return self.fd.accept() # 直接接受请求并返回 fd = self.fd # 设置连接实例 while True: res = socket_accept(fd) # 接受请求 if res is not None: # 如果不为空 client, addr = res # 获取连接 set_nonblocking(client) # 设置非阻塞 return type(self)(client), addr # 返回实例化GreenSocket实例 wait_reader(fd.fileno(), timeout=self.gettimeout(), timeout_exc=timeout) # 如果没有请求连接则注册读事件等待请求接入
其中socket_accept就是调用了传入fd的accept方法,当有请求连接进来的时候则返回实例化的GreenSocket,如果没有则注册读事件,调用wait_reader等待请求接入,
def wait_reader(fileno, timeout=-1, timeout_exc=TimeoutError): evt = core.read(fileno, _wait_helper, timeout, (getcurrent(), timeout_exc)) # 向libevent中注册读事件,传入fd和注册回调方法和参数 try: returned_ev = get_hub().switch() # 切换到主parent中执行 assert evt is returned_ev, (evt, returned_ev) finally: evt.cancel() # 当事件完成后取消该事件
假如此时还没有连接请求,此时就会调用wait_reader函数,其中先注册请求事件然后转入到主循环中执行,此时我们先看get_hub()函数,
def get_hub(): global _threadlocal try: hub = _threadlocal.hub # 检查该全局变量是否有hub属性 except AttributeError: # do not import anything that can be monkey-patched at top level import threading # 没有的话,则设置成线程安全的值 _threadlocal = threading.local() hub = _threadlocal.hub = Hub() # 设置为Hub类的实例 return hub
继续查看Hub类,
class Hub(object): def __init__(self): self.greenlet = Greenlet(self.run) # 将self.run包装成协程 self.keyboard_interrupt_signal = None def switch(self): cur = getcurrent() # 获取当前协程 assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP' switch_out = getattr(cur, 'switch_out', None) # 如果当前协程有switch_out属性则直接执行该方法 if switch_out is not None: try: switch_out() except: traceback.print_exception(*sys.exc_info()) if self.greenlet.dead: # 检查当前协程是否死亡 self.greenlet = Greenlet(self.run) # 如果死亡则重新初始化 return self.greenlet.switch() # 调用switch函数切换协程 def run(self, *args, **kwargs): if self.keyboard_interrupt_signal is None: self.keyboard_interrupt_signal = signal(2, MAIN.throw, KeyboardInterrupt) while True: result = core.dispatch() # 调用libevent的事件调度函数,去检查是否有注册的事件发生 if result>0: return 'Hub.run() has finished because there are no events registered' elif result<0: return 'Hub.run() has finished because there was an error' return result
由此可知,get_hub().switch()函数,就是调用了Hub类的switch函数,该函数就是调用执行了Hub类的run方法,来循环检查是否有注册事件发生,此时会一直循环等待直到有注册的时间发生并处理。
此时假如有请求连接进来,此时就会调用刚刚wait_reader中注册的_wait_helper函数,
def _wait_helper(ev, fd, evtype): current, timeout_exc = ev.arg # 获取参数值 if evtype & core.EV_TIMEOUT: current.throw(timeout_exc) # 如果超时则报错 else: current.switch(ev) # 切换到传入的协程中继续执行并将ev返回
此时就切换回了位于GreenSocket实例的accept中的while True循环中继续执行,此时就通过socket_accept()接受接入的请求,此时就返回一个新的GreenSocket实例。
当接入新的请求后继续返回server函数中,执行
pool.execute_async(serv.process_request, client_socket)
此时查看Pool的该方法,
def execute_async(self, func, *args, **kwargs): if self.sem.locked(): return spawn_greenlet(self.execute, func, *args, **kwargs) # 等待处理 else: return self.execute(func, *args, **kwargs) # 直接执行
此时继续查看self.execute函数,
def execute(self, func, *args, **kwargs): """Execute func in one of the coroutines maintained by the pool, when one is free. Immediately returns a Proc object which can be queried for the func's result. >>> pool = Pool() >>> task = pool.execute(lambda a: ('foo', a), 1) >>> task.wait() ('foo', 1) """ # if reentering an empty pool, don't try to wait on a coroutine freeing # itself -- instead, just execute in the current coroutine if self.sem.locked() and gevent.getcurrent() in self.procs: # 检查缓冲池中是否存在 p = spawn(func, *args, **kwargs) try: p.wait() except: pass else: self.sem.acquire() # 缓冲池可以立马使用 p = self.procs.spawn(func, *args, **kwargs) # 生成协程并执行 # assuming the above line cannot raise p.link(lambda p: self.sem.release()) return p
此时调用了spawn方法,
def spawn(self, func, *args, **kwargs): p = spawn(func, *args, **kwargs) # 生成协程 self.add(p) return p
此时的spawn就是调用了,
def spawn(function, *args, **kwargs): """Create a new greenlet that will run `function(*args)'. The current greenlet won't be unscheduled. Keyword arguments aren't supported (limitation of greenlet), use spawn() to work around that. """ g = Greenlet(lambda : function(*args, **kwargs)) # 生成协程 g.parent = get_hub().greenlet # 设置生成协程的parent timer(0, g.switch) # 注册0秒后执行 return g # 返回该协程
此时,马上就会被执行的就是pool.execute_async中传入的函数和参数,即serv.process_request和参数client_socket,
此时查看Server的process_request方法,
def process_request(self, (socket, address)): proto = self.protocol(socket, address, self) proto.handle()
此时的protocol就是默认传入的HttpProtocol,通过生成该类的实例,然后调用该实例的handle方法,HttpProtocol类继承自BaseHTTPRequestHandler,此时BaseRequestHandler的流程是,在初始化完成的时候就分别调用setup,handler和finish方法,初始化的时候就开始解析请求并处理请求,请求完成后就finish。
由于在setup时调用的是StreamRequestHandler的setup方法,该方法如下,
def setup(self): self.connection = self.request if self.timeout is not None: self.connection.settimeout(self.timeout) if self.disable_nagle_algorithm: self.connection.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) self.rfile = self.connection.makefile('rb', self.rbufsize) self.wfile = self.connection.makefile('wb', self.wbufsize)
该方法中的connection就是接入请求时,初始化的GreenSocket实例,此时查看GreenSocket的makefile方法,
def makefile(self, mode='r', bufsize=-1): return _socket._fileobject(self.dup(), mode, bufsize)
此时的调用了self.dup()方法,并使用了_fileobject类初始化,
def dup(self, *args, **kw): sock = self.fd.dup(*args, **kw) # 调用socket的dup方法 set_nonblocking(sock) # 设置非阻塞 newsock = type(self)(sock) # 生成GreenSocket实例 newsock.settimeout(self.timeout) # 设置超时时间 return newsock # 返回实例
此时就分别设置了rfile和wfile,此时就调用了handle方法,该方法调用的是BaseHTTPRequestHandler的handler方法,
def handle(self): """Handle multiple requests if necessary.""" self.close_connection = 1 self.handle_one_request() while not self.close_connection: self.handle_one_request()
由于HttpProtocol重写了handle_one_request方法,所以此时就调用了HttpProtocol的handle_one_request方法,
def handle_one_request(self): if self.server.max_http_version: self.protocol_version = self.server.max_http_version # 设置http版本 if self.rfile.closed: self.close_connection = 1 # 检查rfile是否关闭 return # 如果关闭则返回 try: self.raw_requestline = self.rfile.readline(MAX_REQUEST_LINE) # 从rfile中读数据 if len(self.raw_requestline) == MAX_REQUEST_LINE: # 如果超过大小则直接写入返回 self.wfile.write( "HTTP/1.0 414 Request URI Too Long\r\nConnection: close\r\nContent-length: 0\r\n\r\n") self.close_connection = 1 return except socket.error, e: if e[0] != errno.EBADF: raise self.raw_requestline = '' if not self.raw_requestline: # 如果读入数据为空 self.close_connection = 1 return # 直接返回 if not self.parse_request(): # 解析request实例 return self.environ = self.get_environ() # 获取环境变量 self.application = self.server.app # 获取app处理函数 try: self.server.outstanding_requests += 1 try: self.handle_one_response() # 处理并返回 except socket.error, e: # Broken pipe, connection reset by peer if e[0] in (32, 54): pass else: raise finally: self.server.outstanding_requests -= 1
此时,就需要通过self.rfile.readline(MAX_REQUEST_LINE)读取传入的数据,由于此时的rfile是由GreenSocket实例传入的,通过查找_objfile的readline方法,该方法中第一次执行会执行到 self._sock.recv(self._rbufsize)方法,此时的_sock就是在实例化是传入的GreenSocket,此时就调用了GreenSocket实例的recv方法,
recv = higher_order_recv(socket_recv)
其中higher_order_recv函数,如下,
def higher_order_recv(recv_func): def recv(self, buflen): if self.act_non_blocking: # 是否是设置成阻塞模式 return self.fd.recv(buflen) # 直接读取数据 buf = self.recvbuffer # 获取接受缓冲区 if buf: # 如果接受缓冲区有内容 chunk, self.recvbuffer = buf[:buflen], buf[buflen:] # 获取指定大小的数据并返回 return chunk fd = self.fd # 如果接受缓冲区没有数据则需要读取数据 bytes = recv_func(fd, buflen) # 由于设置的是非阻塞模式,如果没有可读数据会返回为空 if self.gettimeout(): # 获取超时时间 end = time.time()+self.gettimeout() else: end = None # 计算结束时间 timeout_seconds = None while bytes is None: # 如果读入的数据为空 try: if end: timeout_seconds = end - time.time() # 计算超时时间 wait_reader(fd.fileno(), timeout=timeout_seconds, timeout_exc=timeout) # 注册读事件并切换到parent运行 except timeout: raise except error, e: if e[0] == errno.EPIPE: bytes = '' else: raise else: bytes = recv_func(fd, buflen) # 当有读事件返回时读取数据 return bytes return recv
socket_recv函数主要是进行相关错误的捕捉,
def socket_recv(descriptor, buflen): try: return descriptor.recv(buflen) except error, e: if e[0] == errno.EWOULDBLOCK: return None if e[0] in SOCKET_CLOSED: return '' raise except SSL.WantReadError: return None except SSL.ZeroReturnError: return '' except SSL.SysCallError, e: if e[0] == -1 or e[0] > 0: return '' raise
此时就注册了读事件并切换到parent中进行事件循环,等待连接的请求有数据可读。当有数据读入的时候从parent协程切换后继续执行,此时就执行到了HttpProtocol实例的handle_one_response方法,该方法主要就是处理具体的请求处理,
def handle_one_response(self): start = time.time() headers_set = [] headers_sent = [] # set of lowercase header names that were sent header_dict = {} wfile = self.wfile result = None use_chunked = [False] length = [0] status_code = [200] ... try: try: result = self.application(self.environ, start_response) # 调用具体的业务处理函数,例子中的hello_world if not headers_sent and hasattr(result, '__len__'): headers_set[1].append(('content-length', str(sum(map(len, result))))) towrite = [] for data in result: # 将处理返回数据写入 if data: write(data) # 调用write函数写入data if not headers_sent: write('') if use_chunked[0]: wfile.write('0\r\n\r\n') except Exception, e: self.close_connection = 1 exc = traceback.format_exc() print exc if not headers_set: start_response("500 Internal Server Error", [('Content-type', 'text/plain')]) write(exc)
此时就待业务处理完成后就调用了write函数处理返回数据,此时的write函数就位于hande_one_response中的write函数,
def write(data, _writelines=wfile.writelines): # 写入文件的方法 ... try: _writelines(towrite) # 写入数据 length[0] = length[0] + sum(map(len, towrite)) except UnicodeEncodeError: print "Encountered unicode while attempting to write wsgi response: ", [x for x in towrite if isinstance(x, unicode)] traceback.print_exc() _writelines( ["HTTP/1.0 500 Internal Server Error\r\n", "Connection: close\r\n", "Content-type: text/plain\r\n", "Content-length: 98\r\n", "\r\n", "Internal Server Error: wsgi application passed a unicode object to the server instead of a string."])
此时就调用了wfile.writelines此时对应到_fileobject中的writelines,该方法如下,
def writelines(self, list): # XXX We could do better here for very long lists # XXX Should really reject non-string non-buffers lines = filter(None, map(str, list)) self._wbuf_len += sum(map(len, lines)) self._wbuf.extend(lines) if (self._wbufsize <= 1 or self._wbuf_len >= self._wbufsize): self.flush()
调用了flush方法,
def flush(self): if self._wbuf: data = "".join(self._wbuf) self._wbuf = [] self._wbuf_len = 0 buffer_size = max(self._rbufsize, self.default_bufsize) data_size = len(data) write_offset = 0 view = memoryview(data) try: while write_offset < data_size: self._sock.sendall(view[write_offset:write_offset+buffer_size]) write_offset += buffer_size finally: if write_offset < data_size: remainder = data[write_offset:] del view, data # explicit free self._wbuf.append(remainder) self._wbuf_len = len(remainder)
调用了self._sock.sendall方法,即调用了GreenSocket实例的sendall方法,
def sendall(self, data): fd = self.fd tail = self.send(data) # 发送数据 while tail < len(data): # 检查数据是否发送完成 wait_writer(self.fileno(), timeout_exc=timeout) # 注册等待写入事件 tail += self.send(data[tail:]) # 当有可写时切换回来之后就发送数据
此时就将处理完成的数据返回,至此一个请求完成的基于gevent的请求接受和处理数据的发送就处理完成。
gevent总结
gevent基于greenlet的协程的切换的执行方式,利用包装的GreenSocket类将数据的请求的时间进行了封装,达到了遇到阻塞事件时,将阻塞事件注册并切换到主parent中执行事件循环,待注册事件发生后,再切换回原协程继续执行,至此gevent的大致原理如上。
转载地址:https://blog.csdn.net/qq_33339479/article/details/81143775 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!