
zmq pub-sub通信之ipc双向主题
异步非阻塞:Pub-Sub模式支持异步通信,客户端和服务端可以同时进行多个操作,不会因为等待消息而阻塞。 消息分发:服务端自动将发布的消息分发给所有订阅的客户端,客户端无需主动连接服务端。 高效可靠:ZMQ采用零拷贝技术,消息传输效率高,且连接可靠性强。
发布日期:2021-05-17 18:23:20
浏览次数:22
分类:精选文章
本文共 2620 字,大约阅读时间需要 8 分钟。
ZMQ Pub-Sub模式是零消息队列(ZeroMQ)中的一种高效通信机制,采用push-pull模式工作。与传统的socket通信不同,ZMQ Pub-Sub模式的服务端(pub)和客户端(sub)之间没有启动顺序的限制,服务端必须先于客户端启动才能正常通信。这一特性使得Pub-Sub模式在分布式系统中具有显著优势。
ZMQ Pub-Sub模式的特点
服务端(Pub.py)与客户端(Sub.py)的工作原理
- 服务端(Pub.py):负责接收消息并发布到指定的主题(topic)。
- 客户端(Sub.py):订阅特定主题的消息并接收推送。
优化后的代码解读
import zmqimport timeimport threading# 定义ZMQ地址LOG_TYPE_PUB_PATH = "ipc:///tmp/log_types.ipc"LOG_SUB_PATH = "ipc:///tmp/log_lator.ipc"TOPIC_LIST = ["lator", "att"]def unlink_ipc(path): index = path.find('ipc://') if index == -1: return fpath = path[index+5:] if os.path.exists(fpath): os.unlink(fpath)def pub(pubaddr, topic): context = zmq.Context() sock = context.socket(zmq.PUB) sock.set_hwm(100) # 设置消息最大大小 unlink_ipc(pubaddr) sock.bind(pubaddr) os.chmod(pubaddr[len('ipc://'):], stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU) while True: messagedata = "this is msg from topic %s %s" % (topic, str(counter)) print("%s %s" % (topic, messagedata)) sock.send("%s %s" % (topic, messagedata)) counter += 1 time.sleep(1)if __name__ == "__main__": t1 = threading.Thread(target=pub, args=(LOG_TYPE_PUB_PATH, "lator")) t2 = threading.Thread(target=pub, args=(LOG_SUB_PATH, "att")) t1.start() t2.start() t1.join() t2.join()
客户端(Sub.py)代码解读
import zmqimport osfrom zmq.eventloop.ioloop import IOLoopfrom zmq.eventloop.zmqstream import ZMQStreamLOG_TYPE_PUB_PATH = "ipc:///tmp/log_types.ipc"LOG_SUB_PATH = "ipc:///tmp/log_lator.ipc"TOPIC_LIST = ["lator", "att"]def unlink_ipc(path): index = path.find('ipc://') if index == -1: return fpath = path[index+5:] if os.path.exists(fpath): os.unlink(fpath)def recv_func(msg): print(msg)def main2(): loop = IOLoop.instance() ctx = zmq.Context.instance() sock = ctx.socket(zmq.SUB) sock.set_hwm(100) sock.connect(LOG_TYPE_PUB_PATH) sock.connect(LOG_SUB_PATH) for topic in TOPIC_LIST: if isinstance(topic, str): sock.setsockopt(zmq.SUBSCRIBE, topic) elif isinstance(topic, unicode): sock.setsockopt_string(zmq.SUBSCRIBE, topic) else: print("无法设置订阅:%s" % topic) sock = ZMQStream(sock, loop) sock.on_recv(recv_func) loop.start()if __name__ == "__main__": main2()
总结
ZMQ Pub-Sub模式通过服务端自动推送消息给客户端,极大简化了客户端的连接逻辑,适用于分布式系统中的异步通信需求。服务端和客户端的启动顺序必须注意,服务端必须先于客户端启动。通过优化代码结构和添加合理的日志处理,可以进一步提升ZMQ Pub-Sub模式的性能和可靠性。