服务端异步IO配合协程浅析
发布日期:2021-07-25 13:04:21 浏览次数:7 分类:技术文章

本文共 4826 字,大约阅读时间需要 16 分钟。

服务端异步IO配合协程浅析

代码如下

#coding:utf-8import socketfrom selectors import DefaultSelector, EVENT_READ, EVENT_WRITEselector = DefaultSelector()stopped = Falseresponse = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n'response += b'hello world'class Future:    def __init__(self):        self.result = None        self._callback = []    def add_done_callback(self, fn):        self._callback.append(fn)    def set_result(self, result):        self.result = result        for fn in self._callback:            fn(self)    def __iter__(self):        yield self        return self.resultclass Task:    def __init__(self, coro):        self.coro = coro        f = Future()        f.set_result(None)        self.step(f)    def step(self, future):        try:            next_future = self.coro.send(future.result)            print("next_future: ", next_future)        except StopIteration as e:            print("step stop :", e.value)            return        next_future.add_done_callback(self.step)class ConHandler(object):    def __init__(self, con, addr):        self.con = con                                      # 新连接实例        self.addr = addr                                    # 新连接地址    def _read_ready(self):        f = Future()        def sock_read():                                    # 由于没有解析相应的流协议,所以这里直接接受所有的数据,                                                            # 如果解析具体协议也可以改写成协程            all_data = b""                                  # 接受数据            while True:                                     # 一直接受数据直到数据接受完成                try:                    data = self.con.recv(10)                    if data:                        print("recv data : ", data)                        all_data += data                    else:                        print("break while")                        break                except BlockingIOError:                    print("BlockingIOError")                    break            print("all_data", all_data)            # selector.unregister(self.con.fileno())            f.set_result(None)                              # 当数据接受完成后,调用f实例的回调方法,进入将主流程往下执行一步        selector.register(self.con.fileno(), EVENT_READ, sock_read)     # 注册连接的读事件并注册回调函数        yield f        selector.unregister(self.con.fileno())              # 取消连接的注册事件    def fetch(self):        # 读事件到来一次读完        fu = yield from self._read_ready()                  # 调用读数据        print("recv_data: ", fu)        # handler  处理请求        self.response = response*10000        result = yield from self.sock_send_all()            # 将处理的数据发送出去        print("send after : ", result)    def sock_send_all(self):        send_length = yield from self.sock_send()           # 发送相应数据,返回发送的数据长度        self.response = self.response[send_length:]         # 减去已经发送的数据长度        while send_length:                                  # 如果发送长度不为0            send_length = yield from self.sock_send()       # 继续发送数据            self.response = self.response[send_length:]     # 减去已经发送的数据长度        return    def sock_send(self):        f = Future()        def _sock_send():            con_length = self.con.send(self.response)            f.set_result(con_length)        selector.register(self.con.fileno(), EVENT_WRITE, _sock_send)        send_length = yield f        selector.unregister(self.con.fileno())        return send_lengthdef create_server():    sock = socket.socket()                                      # 创建连接    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # 设置端口重用    sock.bind(('0.0.0.0', 9003))                                # 设置监听连接端口    sock.listen(10)                                             # 设置监听缓存区    sock.setblocking(0)                                         # 设置连接非阻塞    def sock_accept():                                          # 如果sock的新请求连接处理回调函数        try:            conn, addr = sock.accept()                          # 接受新请求            conn.setblocking(False)                             # 设置接受新连接非阻塞            print(conn)        except Exception:            pass        Task(ConHandler(conn, addr).fetch())                # 用任务类包裹处理流程,使之流程协程处理    selector.register(sock.fileno(), EVENT_READ, sock_accept)   # 注册sock的连接读事件并设置读事件的回调函数def loop():    while not stopped:        events = selector.select()                              # 获取触发的事件        for event_key, event_mask in events:            callback = event_key.data                           # 获取触发事件的回调函数            callback()                                          # 执行回调函数if __name__ == '__main__':    create_server()    loop()

基于上一篇的异步协程爬虫,现在服务端的大概原理是:

1.注册服务端连接的读事件和读回调函数,
2.读回调函数,将接受的新请求传入接受处理函数,并包装成Task类执行,
3.注册新连接的读事件,接受完成数据后,调用Task往下执行,
4.处理完成后,将处理的数据写出时,注册新连接的写事件,如果数据未写完继续执行,循环执行,直到数据写出完成
5.取消连接的事件监听
6.继续等待新连接的介入
至此上述代码的执行流程已完成。

转载地址:https://blog.csdn.net/qq_33339479/article/details/78599186 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Django源码分析1:创建项目和应用分析
下一篇:异步爬虫框架与协程浅析

发表评论

最新留言

网站不错 人气很旺了 加油
[***.192.178.218]2024年04月06日 19时24分01秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

iOS UILabel根据字符串长度自动适应宽度和高度 2019-04-26
细说JVM内存模型 2019-04-26
谈谈设计模式:建造者模式在jdk中的体现,它和工厂模式区别? 2019-04-26
走进Java架构!找工作,去小公司好,还是大公司好- 2019-04-26
趁热打铁!想要轻松搞定MySQL事务?一招解决分布式一致性 2019-04-26
改造RequestMappingHandlerMapping,使spring boot2的controller可以继承夫类的@RequestMapping 并追加路径到当前mapping前 2019-04-26
我的世界Bukkit服务器插件开发---快速开发框架---快速开发自己的服务器插件 2019-04-26
我的世界Bukkit服务器插件开发---用java 10分钟快速开发一个自己的服务器插件 2019-04-26
快速搭建Spring Cloud项目,含Spring Gateway、Config Server、Oauth、Mongodb、MyBatis、Redis、Feigh 2019-04-26
搭建Spring cloud项目---搭建Consul 2019-04-26
SpringBoot-SpringCloud分布式事务-初稿-等项目写完开源后再来改为终版 2019-04-26
搭建docker私有镜像仓库(帐号密码登录) 2019-04-26
java获取文件的mime,java获取文件是不是文本,java获取文件类型(非后缀方式) 2019-04-26
冰蝎V3.0Beta9-fixed客户端JAVA源码与配套服务端 2019-04-26
图片后缀和ContentType大全 2019-04-26
卸载notpad++,改用vscode,将VS Code添加到右键文件菜单 2019-04-26
win10最强DLL注入工具,远程线程注入、消息钩子注入、输入法注入、EIP注入、注册表注入、APC注入(APC好像不能用) 2019-04-26
JAVA反编译工具(界面版本的)-JAR反编译-war反编译-war逆向工程 2019-04-26
快速搭建SDN开发环境:ONOS开发环境 2019-04-26
OpenCV图像处理技术(Python)——几何变换 2019-04-26