本文共 42454 字,大约阅读时间需要 141 分钟。
Python标准库asyncio模块基本原理浅析
本文环境python3.7.0
asyncio模块的实现思路
当前编程语言都开始在语言层面上,开始简化对异步程序的编程过程,其中Python中也开始了在语言层面上对异步编程的简化,特地使用了await和async这两个关键字来进行对异步代码的简化和封装。本文就开始简单的分析一下asyncio标准库是怎么来封装异步编程的这么一个过程,大致浏览一下asyncio中一个代码示例的工作流程,其实Python异步编程的本质也是封装了yeild等协程来实现的,涉及到的Task,Future等概念可通过前文来了解一下如果通过关键字yield等来直接进行协程封装的异步编程过程,该过程与本文待分析的asyncio模块包的工作原理基本一致,只不过在asnycio模块中添加了对await和async关键字的支持。
await和async关键字示例浅析
首先来看一下await和async两个关键字的基本示例:
class TestAsync(object): def __await__(self): yield self print("await return value") return 'TestAsync'async def test(): print("start") data = await TestAsync() print("over ", data) return 'test return'if __name__ == '__main__': c = test() res = c.send(None) print('res ', res) try: c.send(None) except StopIteration as exc: print(" c last return ", exc.value)
该示例代码的返回结果如下:
startres <__main__. TestAsync object at 0x7f29e8ebd240>await return valueover TestAsync c last return test return
从运行结果可知,首先先得到一个实例c,然后向实例send值None,此时就执行到了TestAsync类实例的__await__方法处的yield方法,此时就yield返回了自己,此时就打印出了res,再一次通过c去send值None时,此时就执行到了TestAsync的__await__方法剩下的函数流程去执行,此时就报错StopIteration错误,此时返回的值就是__await__方法返回的值,此时test函数在await TestAsync之后的代码继续执行,此时的data 就是__await__的返回值,然后test函数执行完成后,返回’test return’,此时整个执行结束,此时报错的返回值就是test函数执行完成的函数值’test return’,至此一个简单的有关await和async关键字的例子就执行完成了。此处其实不用该关键字,也可以实现同等情况下的效果,可参考yield和yield from 的关键字的使用方法,大家有兴趣可自行尝试。
asyncio模块的原理描述与示例代码
有关异步编程,其实本质上都是通过事件触发来实现的异步编程,本质上都采用了IO复用的方式,来实现非阻塞的操作,通过注册读或写事件来注册当该事件发生时的回调函数,完成的时候就执行回调,让逻辑继续执行,有关IO复用的详细内容,大家可自行查阅相关内容,asyncio模块的本质也是围绕IO复用来实现的时间注册的运行模式,只不过配合了协程来实现,从而使编程的方式更加的简化,可以保存当前函数的执行的过程,从而更方便简洁的实现相关业务逻辑。
import asyncioimport urllib.parsedef test(): async def print_http_headers(url): url = urllib.parse.urlsplit(url) if url.scheme == 'https': reader, writer = await asyncio.open_connection( url.hostname, 443, ssl=True ) else: reader, writer = await asyncio.open_connection( url.hostname, 80 ) query = ( f"HEAD {url.path or '/' } HTTP/1.0\r\n" f"Host: {url.hostname}\r\n" f"\r\n" ) writer.write(query.encode('latin-1')) while True: line = await reader.readline() if not line: break line = line.decode('latin1').rstrip() if line: print(f'HTTP header> {line}') writer.close() url = 'https://www.baidu.com' asyncio.run(print_http_headers(url))if __name__ == '__main__': test()
该函数的执行结果如下:
HTTP header> HTTP/1.0 200 OKHTTP header> Accept-Ranges: bytesHTTP header> Cache-Control: private, no-cache, no-store, proxy-revalidate, no-transformHTTP header> Content-Length: 277HTTP header> Content-Type: text/htmlHTTP header> Date: Tue, 22 Jan 2019 00:40:02 GMTHTTP header> Etag: "575e1f80-115"HTTP header> Last-Modified: Mon, 13 Jun 2016 02:50:40 GMTHTTP header> Pragma: no-cacheHTTP header> Server: bfe/1.0.8.18
接下来,本文就简单分析一下该示例代码的执行流程。
asyncio模块示例代码执行原理分析
首先查看实例代码的asyncio.run()函数的基本内容:
def run(main, *, debug=False): """Run a coroutine. This function runs the passed coroutine, taking care of managing the asyncio event loop and finalizing asynchronous generators. This function cannot be called when another asyncio event loop is running in the same thread. If debug is True, the event loop will be run in debug mode. This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once. Example: async def main(): await asyncio.sleep(1) print('hello') asyncio.run(main()) """ if events._get_running_loop() is not None: # 检查当前线程是否有loop实例,如果不为空则报错 raise RuntimeError( "asyncio.run() cannot be called from a running event loop") # 如果获取不到则报错 if not coroutines.iscoroutine(main): # 检查是否是协程 raise ValueError("a coroutine was expected, got {!r}".format(main)) # 如果不是则报错 loop = events.new_event_loop() # 获取loop循环实例,该实例就是IO复用的循环处理实例 try: events.set_event_loop(loop) # 设置loop loop.set_debug(debug) # 设置loop是否为调试模式 return loop.run_until_complete(main) # 调用run_until_complete方法直到传入的main函数执行完毕 finally: try: _cancel_all_tasks(loop) loop.run_until_complete(loop.shutdown_asyncgens()) finally: events.set_event_loop(None) loop.close()
该函数主要是做了一些检查,获取loop的实例,然后调用实例的run_until_complete方法,我们查看一下new_event_loop()函数的内容:
def new_event_loop(): """Equivalent to calling get_event_loop_policy().new_event_loop().""" return get_event_loop_policy().new_event_loop() # 调用loop的new_event_loop函数def _init_event_loop_policy(): global _event_loop_policy # 全局变量 _event_loop_policy with _lock: # 加锁 if _event_loop_policy is None: # pragma: no branch from . import DefaultEventLoopPolicy _event_loop_policy = DefaultEventLoopPolicy() # 使用默认的DefaultEventLoopPolicy初始化并获取实例def get_event_loop_policy(): """Get the current event loop policy.""" if _event_loop_policy is None: # 判断全局变量是否为空 _init_event_loop_policy() # 为空则初始化 return _event_loop_policy # 返回初始化实例
此时可知,loop就是DefaultEventLoopPolicy类实例调用new_event_loop方法返回的实例,在Linux系统上DefaultEventLoopPolicy就是_UnixDefaultEventLoopPolicy,继续查看该类的代码:
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): """UNIX event loop policy with a watcher for child processes.""" _loop_factory = _UnixSelectorEventLoop # loop实例化的类 ... def set_event_loop(self, loop): """Set the event loop. As a side effect, if a child watcher was set before, then calling .set_event_loop() from the main thread will call .attach_loop(loop) on the child watcher. """ super().set_event_loop(loop) if (self._watcher is not None and isinstance(threading.current_thread(), threading._MainThread)): self._watcher.attach_loop(loop) ...class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): """Default policy implementation for accessing the event loop. In this policy, each thread has its own event loop. However, we only automatically create an event loop by default for the main thread; other threads by default have no event loop. Other policies may have different rules (e.g. a single global event loop, or automatically creating an event loop per thread, or using some other notion of context to which an event loop is associated). """ _loop_factory = None class _Local(threading.local): _loop = None _set_called = False def __init__(self): self._local = self._Local() # 线程安全保存 def get_event_loop(self): # 获取loop """Get the event loop. This may be None or an instance of EventLoop. """ if (self._local._loop is None and not self._local._set_called and isinstance(threading.current_thread(), threading._MainThread)): # 判断loop是否为空 self.set_event_loop(self.new_event_loop()) # 如果为空则设置一个新的loop实例 if self._local._loop is None: # 如果设置完成后仍然为空则报错 raise RuntimeError('There is no current event loop in thread %r.' % threading.current_thread().name) return self._local._loop # 返回loop实例 def set_event_loop(self, loop): """Set the event loop.""" self._local._set_called = True # 设置被设置标志为True assert loop is None or isinstance(loop, AbstractEventLoop) # 判断loop实例是否为AbstractEventLoop子类 self._local._loop = loop # 设置loop类实例 def new_event_loop(self): """Create a new event loop. You must call set_event_loop() to make this the current event loop. """ return self._loop_factory() # 实例化loop类实例
此时loop就是_UnixSelectorEventLoop类的实例,继续查看该类的run_until_complete方法:
_UnixSelectorEventLoop继承自selector_events.BaseSelectorEventLoop继承自base_events.BaseEventLoop继承自events.AbstractEventLoop
此时调用的就是BaseEventLoop的run_until_complete方法;
def run_until_complete(self, future): """Run until the Future is done. If the argument is a coroutine, it is wrapped in a Task. WARNING: It would be disastrous to call run_until_complete() with the same coroutine twice -- it would wrap it in two different Tasks and that can't be good. Return the Future's result, or raise its exception. """ self._check_closed() # 检查loop是否是关闭状态 new_task = not futures.isfuture(future) # 检查是否是future类 future = tasks.ensure_future(future, loop=self) # 生成Task任务实例 if new_task: # 是否是task # An exception is raised if the future didn't complete, so there # is no need to log the "destroy pending task" message future._log_destroy_pending = False future.add_done_callback(_run_until_complete_cb) # 添加完成回调方法 try: self.run_forever() # 运行 except: if new_task and future.done() and not future.cancelled(): # 报错则检查任务是否完成 是否取消 # The coroutine raised a BaseException. Consume the exception # to not log a warning, the caller doesn't have access to the # local task. future.exception() # 报错 raise finally: future.remove_done_callback(_run_until_complete_cb) # 移除回调方法 if not future.done(): # 如果任务没有完成则报错 raise RuntimeError('Event loop stopped before Future completed.') return future.result() # 返回future 的执行结果
首先查看tasks.ensure_future方法来查看生成Task的方法;
def ensure_future(coro_or_future, *, loop=None): """Wrap a coroutine or an awaitable in a future. If the argument is a Future, it is returned directly. """ if coroutines.iscoroutine(coro_or_future): # 判断是否是协程 在本例中是协程 if loop is None: # 检查loop是否为空 loop = events.get_event_loop() # 为空则创建一个新的loop实例 task = loop.create_task(coro_or_future) # 初始化一个task if task._source_traceback: del task._source_traceback[-1] return task # 返回task实例 elif futures.isfuture(coro_or_future): # 是否是future if loop is not None and loop is not futures._get_loop(coro_or_future): raise ValueError('loop argument must agree with Future') return coro_or_future elif inspect.isawaitable(coro_or_future): # 是否是await类 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) # 如果是则通过包装后继续生成一个task实例 else: raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' 'required') # 都不是则报错
在本例中main是否一个coroutine则直接调用了loop.create_task(coro_or_future)来生成一个task实例,此时就调用了base_events.BaseEventLoop的create_task方法;
def create_task(self, coro): """Schedule a coroutine object. Return a task object. """ self._check_closed() # 检查是否关闭 if self._task_factory is None: # 如果_task_factory为空则使用tasks.Task类初始化生成一个该类实例 task = tasks.Task(coro, loop=self) # 生成task实例 if task._source_traceback: del task._source_traceback[-1] else: task = self._task_factory(self, coro) # 如果配置则使用配置的_task_factory return task # 返回task实例
此时我们分析一下tasks.Task的初始化过程;
class Task(futures._PyFuture): # Inherit Python Task implementation # from a Python Future implementation. """A coroutine wrapped in a Future.""" # An important invariant maintained while a Task not done: # # - Either _fut_waiter is None, and _step() is scheduled; # - or _fut_waiter is some Future, and _step() is *not* scheduled. # # The only transition from the latter to the former is through # _wakeup(). When _fut_waiter is not None, one of its callbacks # must be _wakeup(). # If False, don't log a message if the task is destroyed whereas its # status is still pending _log_destroy_pending = True ... def __init__(self, coro, *, loop=None): super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] if not coroutines.iscoroutine(coro): # raise after Future.__init__(), attrs are required for __del__ # prevent logging for pending task in __del__ self._log_destroy_pending = False raise TypeError(f"a coroutine was expected, got {coro!r}") self._must_cancel = False self._fut_waiter = None self._coro = coro self._context = contextvars.copy_context() # 获取执行上下文 self._loop.call_soon(self.__step, context=self._context) # 调用loop的call_soon方法将__step方法传入执行 _register_task(self) ... def __step(self, exc=None): if self.done(): # 检查是否已经完成 raise futures.InvalidStateError( f'_step(): already done: {self!r}, {exc!r}') if self._must_cancel: if not isinstance(exc, futures.CancelledError): exc = futures.CancelledError() self._must_cancel = False coro = self._coro # 获取协程 self._fut_waiter = None _enter_task(self._loop, self) # Call either coro.throw(exc) or coro.send(None). try: if exc is None: # We use the `send` method directly, because coroutines # don't have `__iter__` and `__next__` methods. result = coro.send(None) # 通过协程调用send去执行,result就是返回的future或者报错的值 else: result = coro.throw(exc) except StopIteration as exc: if self._must_cancel: # Task is cancelled right before coro stops. self._must_cancel = False super().set_exception(futures.CancelledError()) else: super().set_result(exc.value) # 如果StopIteration,则返回函数最后的返回值 except futures.CancelledError: super().cancel() # I.e., Future.cancel(self). except Exception as exc: super().set_exception(exc) # 报错则抛出错误 except BaseException as exc: super().set_exception(exc) raise else: blocking = getattr(result, '_asyncio_future_blocking', None) # 如果成功则获取result的_asyncio_future_blocking属性 if blocking is not None: # 如果不为空 # Yielded Future must come from Future.__iter__(). if futures._get_loop(result) is not self._loop: # 检查是否是同一个loop 不是则报错 new_exc = RuntimeError( f'Task {self!r} got Future ' f'{result!r} attached to a different loop') self._loop.call_soon( self.__step, new_exc, context=self._context) elif blocking: # 如果为True if result is self: # 如果结果为自己则报错 new_exc = RuntimeError( f'Task cannot await on itself: {self!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) else: result._asyncio_future_blocking = False # 设置为False result.add_done_callback( self.__wakeup, context=self._context) # 添加self.__wakeup到回调函数列表中 self._fut_waiter = result # 设置_fut_waiter为result if self._must_cancel: # 检查是否需要取消 if self._fut_waiter.cancel(): self._must_cancel = False else: new_exc = RuntimeError( f'yield was used instead of yield from ' f'in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) # 其他情况都报错处理 elif result is None: # 如果result为空 # Bare yield relinquishes control for one event loop iteration. self._loop.call_soon(self.__step, context=self._context) # 继续调用__step elif inspect.isgenerator(result): # 检查是否是生成器 # Yielding a generator is just wrong. new_exc = RuntimeError( f'yield was used instead of yield from for ' f'generator in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) # 是生成器则报错 else: # Yielding something else is an error. new_exc = RuntimeError(f'Task got bad yield: {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) # 否则就抛错误的Task finally: _leave_task(self._loop, self) # 弹出该task self = None # Needed to break cycles when an exception occurs. def __wakeup(self, future): try: future.result() # 先获取future的值 except Exception as exc: # This may also be a cancellation. self.__step(exc) # 如果报错则直接报错处理 else: # Don't pass the value of `future.result()` explicitly, # as `Future.__iter__` and `Future.__await__` don't need it. # If we call `_step(value, None)` instead of `_step()`, # Python eval loop would use `.send(value)` method call, # instead of `__next__()`, which is slower for futures # that return non-generator iterators from their `__iter__`. self.__step() # 继续调用step执行 self = None # Needed to break cycles when an exception occurs.调用了base_events.BaseEventLoop类的call_soon方法 def call_soon(self, callback, *args, context=None): """Arrange for a callback to be called as soon as possible. This operates as a FIFO queue: callbacks are called in the order in which they are registered. Each callback will be called exactly once. Any positional arguments after the callback will be passed to the callback when it is called. """ self._check_closed() # 检查是否关闭 if self._debug: # 是否是调试模式 self._check_thread() self._check_callback(callback, 'call_soon') handle = self._call_soon(callback, args, context) # 调用_call_soon方法,包装传入的方法 if handle._source_traceback: del handle._source_traceback[-1] return handle # 返回 def _call_soon(self, callback, args, context): handle = events.Handle(callback, args, self, context) # 实例化一个handler类 if handle._source_traceback: del handle._source_traceback[-1] self._ready.append(handle) # 添加到_ready队列中 return handle # 返回handle
其中首先初始化了一个Task实例,在初始化的过程中就调用了loop的call_soon方法,该方法就先执行了传入的main()函数,先给该函数send值None此时就开始执行了,此时就执行到了asyncio.open_connection处,此时分析该代码;
async def open_connection(host=None, port=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): """A wrapper for create_connection() returning a (reader, writer) pair. The reader returned is a StreamReader instance; the writer is a StreamWriter instance. The arguments are all the usual arguments to create_connection() except protocol_factory; most common are positional host and port, with various optional keyword arguments following. Additional optional keyword arguments are loop (to set the event loop instance to use) and limit (to set the buffer limit passed to the StreamReader). (If you want to customize the StreamReader and/or StreamReaderProtocol classes, just copy the code -- there's really nothing special here except some convenience.) """ if loop is None: # 判断是否为空 loop = events.get_event_loop() # 为空则获取loop reader = StreamReader(limit=limit, loop=loop) # 初始化一个读实例 protocol = StreamReaderProtocol(reader, loop=loop) # 初始化一个读实例 transport, _ = await loop.create_connection( lambda: protocol, host, port, **kwds) # 创建连接 writer = StreamWriter(transport, protocol, reader, loop) # 初始化一个写实例 return reader, writer # 返回读和写
此时首先初始化了reader和protocol然后进入到loop.create_connections来创建连接,
async def create_connection( self, protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None): """Connect to a TCP server. Create a streaming transport connection to a given Internet host and port: socket family AF_INET or socket.AF_INET6 depending on host (or family if specified), socket type SOCK_STREAM. protocol_factory must be a callable returning a protocol instance. This method is a coroutine which will try to establish the connection in the background. When successful, the coroutine returns a (transport, protocol) pair. """ if server_hostname is not None and not ssl: # 检查连接是否是ssl raise ValueError('server_hostname is only meaningful with ssl') if server_hostname is None and ssl: # 检查是否传入host # Use host as default for server_hostname. It is an error # if host is empty or not set, e.g. when an # already-connected socket was passed or when only a port # is given. To avoid this error, you can pass # server_hostname='' -- this will bypass the hostname # check. (This also means that if host is a numeric # IP/IPv6 address, we will attempt to verify that exact # address; this will probably fail, but it is possible to # create a certificate for a specific IP address, so we # don't judge it here.) if not host: raise ValueError('You must set server_hostname ' 'when using ssl without a host') server_hostname = host if ssl_handshake_timeout is not None and not ssl: # 检查是否在ssl时传入time_out raise ValueError( 'ssl_handshake_timeout is only meaningful with ssl') if host is not None or port is not None: # 如果Host不为空或者port不为空 if sock is not None: raise ValueError( 'host/port and sock can not be specified at the same time') infos = await self._ensure_resolved( (host, port), family=family, type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) # 解析Host信息 if not infos: raise OSError('getaddrinfo() returned empty list') if local_addr is not None: laddr_infos = await self._ensure_resolved( local_addr, family=family, type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) if not laddr_infos: raise OSError('getaddrinfo() returned empty list') exceptions = [] for family, type, proto, cname, address in infos: try: sock = socket.socket(family=family, type=type, proto=proto) # 尝试连接 sock.setblocking(False) # 设置连接为非阻塞的 if local_addr is not None: for _, _, _, _, laddr in laddr_infos: try: sock.bind(laddr) # 监听端口 break except OSError as exc: msg = ( f'error while attempting to bind on ' f'address {laddr!r}: ' f'{exc.strerror.lower()}' ) exc = OSError(exc.errno, msg) exceptions.append(exc) else: sock.close() # 关闭端口 sock = None continue if self._debug: logger.debug("connect %r to %r", sock, address) await self.sock_connect(sock, address) # 获取连接 except OSError as exc: if sock is not None: sock.close() exceptions.append(exc) except: if sock is not None: sock.close() raise else: break else: if len(exceptions) == 1: raise exceptions[0] else: # If they all have the same str(), raise one. model = str(exceptions[0]) if all(str(exc) == model for exc in exceptions): raise exceptions[0] # Raise a combined exception so the user can see all # the various error messages. raise OSError('Multiple exceptions: {}'.format( ', '.join(str(exc) for exc in exceptions))) else: if sock is None: raise ValueError( 'host and port was not specified and no sock specified') if sock.type != socket.SOCK_STREAM: # We allow AF_INET, AF_INET6, AF_UNIX as long as they # are SOCK_STREAM. # We support passing AF_UNIX sockets even though we have # a dedicated API for that: create_unix_connection. # Disallowing AF_UNIX in this method, breaks backwards # compatibility. raise ValueError( f'A Stream Socket was expected, got {sock!r}') transport, protocol = await self._create_connection_transport( sock, protocol_factory, ssl, server_hostname, ssl_handshake_timeout=ssl_handshake_timeout) # 创建连接 if self._debug: # Get the socket from the transport because SSL transport closes # the old socket and creates a new SSL socket sock = transport.get_extra_info('socket') logger.debug("%r connected to %s:%r: (%r, %r)", sock, host, port, transport, protocol) return transport, protocol # 返回连接
此时继续分析,self.sock_connect函数相关内容;
async def sock_connect(self, sock, address): """Connect to a remote socket at address. This method is a coroutine. """ if self._debug and sock.gettimeout() != 0: # 检查是否在调试模式下是阻塞连接 raise ValueError("the socket must be non-blocking") if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX: resolved = await self._ensure_resolved( address, family=sock.family, proto=sock.proto, loop=self) _, _, _, _, address = resolved[0] # 解析地址 fut = self.create_future() # 创建future self._sock_connect(fut, sock, address) # 调用连接 return await fut # 返回该futdef _sock_connect(self, fut, sock, address): fd = sock.fileno() # 获取文件描述符 try: sock.connect(address) # 连接该地址 except (BlockingIOError, InterruptedError): # Issue #23618: When the C function connect() fails with EINTR, the # connection runs in background. We have to wait until the socket # becomes writable to be notified when the connection succeed or # fails. fut.add_done_callback( functools.partial(self._sock_connect_done, fd)) # 如果报错则添加_sock_connect_done到执行完成的会回调函数列表中 self.add_writer(fd, self._sock_connect_cb, fut, sock, address) # 添加写事件到loop中注册回调方法_sock_connect_cb except Exception as exc: fut.set_exception(exc) # 如果报错则直接报错 else: fut.set_result(None) # 如果此时连接成功则直接让task进行下一步def _sock_connect_done(self, fd, fut): self.remove_writer(fd) # 当连接完成后从监听列表中删除该文件描述符def _sock_connect_cb(self, fut, sock, address): if fut.cancelled(): # 检查是否已经取消 return try: err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) # 获取连接信息 if err != 0: # Jump to any except clause below. raise OSError(err, f'Connect call failed {address}') except (BlockingIOError, InterruptedError): # socket is still registered, the callback will be retried later # 如果当前还不能连接则继续不调用fut去执行下一步 pass except Exception as exc: fut.set_exception(exc) # 如果其他异常 else: fut.set_result(None) # 连接成功则执行下一步,让task继续执行
如果此时连接失败则会注册一个可写事件到循环中,此时就查看send之后的操作,此时继续返回run_until_complete函数,此时就执行到self.run_forever()处,此时的逻辑代码执行过程如下;
def run_forever(self): """Run until stop() is called.""" self._check_closed() # 检查是否关闭 if self.is_running(): # 检查是否已经在运行 raise RuntimeError('This event loop is already running') if events._get_running_loop() is not None: # 获取Loop raise RuntimeError( 'Cannot run the event loop while another loop is running') self._set_coroutine_origin_tracking(self._debug) # 设置是否为调试模式 self._thread_id = threading.get_ident() # 获取当前线程id old_agen_hooks = sys.get_asyncgen_hooks() sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, finalizer=self._asyncgen_finalizer_hook) try: events._set_running_loop(self) # 设置当前运行的loop while True: self._run_once() # 运行 if self._stopping: break finally: self._stopping = False self._thread_id = None events._set_running_loop(None) self._set_coroutine_origin_tracking(False) sys.set_asyncgen_hooks(*old_agen_hooks) def _run_once(self): """Run one full iteration of the event loop. This calls all currently ready callbacks, polls for I/O, schedules the resulting callbacks, and finally schedules 'call_later' callbacks. """ sched_count = len(self._scheduled) # 获取定时器相关的跳读 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): # 检查待执行的定时任务并重新移动到一个调度列表中 # Remove delayed calls that were cancelled if their number # is too high new_scheduled = [] for handle in self._scheduled: if handle._cancelled: handle._scheduled = False else: new_scheduled.append(handle) heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # Remove delayed calls that were cancelled from head of queue. while self._scheduled and self._scheduled[0]._cancelled: # 移除已经超时的任务 self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False timeout = None if self._ready or self._stopping: # 是否有已经准备的任务 timeout = 0 # 如果有则设置为0 elif self._scheduled: # 是否有可调度的任务 # Compute the desired timeout. when = self._scheduled[0]._when # 获取调度任务的时间 timeout = max(0, when - self.time()) # 比较timeout与下一次需要执行的任务的时间 if self._debug and timeout != 0: # 是否为调试模式并且timeout不为0 t0 = self.time() event_list = self._selector.select(timeout) dt = self.time() - t0 if dt >= 1.0: level = logging.INFO else: level = logging.DEBUG nevent = len(event_list) if timeout is None: logger.log(level, 'poll took %.3f ms: %s events', dt * 1e3, nevent) elif nevent: logger.log(level, 'poll %.3f ms took %.3f ms: %s events', timeout * 1e3, dt * 1e3, nevent) elif dt >= 1.0: logger.log(level, 'poll %.3f ms took %.3f ms: timeout', timeout * 1e3, dt * 1e3) else: event_list = self._selector.select(timeout) # 调用IO复用 self._process_events(event_list) # 处理读写事件 # Handle 'later' callbacks that are ready. end_time = self.time() + self._clock_resolution # 获取当前时间 while self._scheduled: # 遍历待执行任务列表 handle = self._scheduled[0] # 获取定时任务 if handle._when >= end_time: # 如果时间还未到则停止 break handle = heapq.heappop(self._scheduled) # 如果时间到了 handle._scheduled = False self._ready.append(handle) # 添加到_ready队列中 # This is the only place where callbacks are actually *called*. # All other places just add them to ready. # Note: We run all currently scheduled callbacks, but not any # callbacks scheduled by callbacks run this time around -- # they will be run the next time (after another I/O poll). # Use an idiom that is thread-safe without using locks. ntodo = len(self._ready) # 获取队列长度 for i in range(ntodo): handle = self._ready.popleft() # 弹出handle if handle._cancelled: # 如果handle取消则循环下一个 continue if self._debug: # 是否是调试模式 try: self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt >= self.slow_callback_duration: logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) finally: self._current_handle = None else: handle._run() # 调用回调函数进行处理 handle = None # Needed to break cycles when an exception occurs.此时调用的是BaseSelectorEventLoop的_process_events方法 def _process_events(self, event_list): for key, mask in event_list: # 遍历任务列表 fileobj, (reader, writer) = key.fileobj, key.data # 获取值 if mask & selectors.EVENT_READ and reader is not None: # 如果可读 if reader._cancelled: # 如果读取消了 self._remove_reader(fileobj) # 移除该文件监听 else: self._add_callback(reader) # 否则就添加到回调函数列表中 if mask & selectors.EVENT_WRITE and writer is not None: if writer._cancelled: self._remove_writer(fileobj) else: self._add_callback(writer)此时调用了BaseEventLoop的_add_callback方法 def _add_callback(self, handle): """Add a Handle to _scheduled (TimerHandle) or _ready.""" assert isinstance(handle, events.Handle), 'A Handle is required here' if handle._cancelled: # 如果任务取消则返回 return assert not isinstance(handle, events.TimerHandle) self._ready.append(handle) # 添加到准备好列表中
此时由于已经注册了写事件到列表中,此时就只需等待IO复用的事件通知,通知完成后则调用_process_events函数处理事件,然后通过_add_callback将唤醒的时间添加到_ready列表中去,待此时可以连接之后就执行了fut.set_result(None) ,此时查看Future类的定义;
def set_result(self, result): """Mark the future done and set its result. If the future is already done when this method is called, raises InvalidStateError. """ if self._state != _PENDING: # 检查状态是否还是未执行 raise InvalidStateError('{}: {!r}'.format(self._state, self)) self._result = result # 设置返回结果 self._state = _FINISHED # 设置状态为完成 self.__schedule_callbacks() # 调用回调方法def __schedule_callbacks(self): """Internal: Ask the event loop to call all callbacks. The callbacks are scheduled to be called as soon as possible. Also clears the callback list. """ callbacks = self._callbacks[:] # 获取回调方法 if not callbacks: return self._callbacks[:] = [] for callback, ctx in callbacks: # 依次遍历添加到可执行列表中 self._loop.call_soon(callback, self, context=ctx)def result(self): """Return the result this future represents. If the future has been cancelled, raises CancelledError. If the future's result isn't yet available, raises InvalidStateError. If the future is done and has an exception set, this exception is raised. """ if self._state == _CANCELLED: raise CancelledError if self._state != _FINISHED: raise InvalidStateError('Result is not ready.') self.__log_traceback = False if self._exception is not None: raise self._exception return self._result # 获取返回值def __await__(self): if not self.done(): # 检查是否完成 self._asyncio_future_blocking = True # 设置标志位true yield self # This tells Task to wait for completion. if not self.done(): # 如果未完成则报错 raise RuntimeError("await wasn't used with future") return self.result() # May raise too. # 最后返回fut设置的result__iter__ = __await__ # make compatible with 'yield from'.
由于在Task的初始化过程中将__step传入回调函数中,在send值None后获得的fut中也添加了__wakeup函数作为回调函数,此时在传入call_soon后就直接调用了__wakeup函数,然后又继续send推进到下一步执行,从而达到了在IO复用的注册的回调函数中,通过调用fut的set_result方法继续推进Task的协程继续向下执行,这也就是Python支持的异步编程的主要的思路。后续的读和写的方法,同理一样的执行流程,限于本文长度就不再本文中继续展开分析。
总结
Python提供的asyncio模块,是更高层的IO复用和协程的封装,其本质也是使用了yield关键字作为协程的执行和流程推进方式,从而大大的方便了用户去编写异步程序,该模块的基本原理都是基于此来实现,其他提供的IO的操作都是基于此原理进行扩展编写完成,大家有兴趣可自行去查看源码学习。鉴于本人才疏学浅,如有疏漏请批评指正。
转载地址:https://blog.csdn.net/qq_33339479/article/details/86592042 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!