zmq pub-sub通信之ipc双向主题
发布日期:2021-05-17 18:23:20 浏览次数:22 分类:精选文章

本文共 2620 字,大约阅读时间需要 8 分钟。

ZMQ Pub-Sub模式是零消息队列(ZeroMQ)中的一种高效通信机制,采用push-pull模式工作。与传统的socket通信不同,ZMQ Pub-Sub模式的服务端(pub)和客户端(sub)之间没有启动顺序的限制,服务端必须先于客户端启动才能正常通信。这一特性使得Pub-Sub模式在分布式系统中具有显著优势。

ZMQ Pub-Sub模式的特点

  • 异步非阻塞:Pub-Sub模式支持异步通信,客户端和服务端可以同时进行多个操作,不会因为等待消息而阻塞。
  • 消息分发:服务端自动将发布的消息分发给所有订阅的客户端,客户端无需主动连接服务端。
  • 高效可靠:ZMQ采用零拷贝技术,消息传输效率高,且连接可靠性强。
  • 服务端(Pub.py)与客户端(Sub.py)的工作原理

    • 服务端(Pub.py):负责接收消息并发布到指定的主题(topic)。
    • 客户端(Sub.py):订阅特定主题的消息并接收推送。

    优化后的代码解读

    import zmq
    import time
    import threading
    # 定义ZMQ地址
    LOG_TYPE_PUB_PATH = "ipc:///tmp/log_types.ipc"
    LOG_SUB_PATH = "ipc:///tmp/log_lator.ipc"
    TOPIC_LIST = ["lator", "att"]
    def unlink_ipc(path):
    index = path.find('ipc://')
    if index == -1:
    return
    fpath = path[index+5:]
    if os.path.exists(fpath):
    os.unlink(fpath)
    def pub(pubaddr, topic):
    context = zmq.Context()
    sock = context.socket(zmq.PUB)
    sock.set_hwm(100) # 设置消息最大大小
    unlink_ipc(pubaddr)
    sock.bind(pubaddr)
    os.chmod(pubaddr[len('ipc://'):], stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU)
    while True:
    messagedata = "this is msg from topic %s %s" % (topic, str(counter))
    print("%s %s" % (topic, messagedata))
    sock.send("%s %s" % (topic, messagedata))
    counter += 1
    time.sleep(1)
    if __name__ == "__main__":
    t1 = threading.Thread(target=pub, args=(LOG_TYPE_PUB_PATH, "lator"))
    t2 = threading.Thread(target=pub, args=(LOG_SUB_PATH, "att"))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    客户端(Sub.py)代码解读

    import zmq
    import os
    from zmq.eventloop.ioloop import IOLoop
    from zmq.eventloop.zmqstream import ZMQStream
    LOG_TYPE_PUB_PATH = "ipc:///tmp/log_types.ipc"
    LOG_SUB_PATH = "ipc:///tmp/log_lator.ipc"
    TOPIC_LIST = ["lator", "att"]
    def unlink_ipc(path):
    index = path.find('ipc://')
    if index == -1:
    return
    fpath = path[index+5:]
    if os.path.exists(fpath):
    os.unlink(fpath)
    def recv_func(msg):
    print(msg)
    def main2():
    loop = IOLoop.instance()
    ctx = zmq.Context.instance()
    sock = ctx.socket(zmq.SUB)
    sock.set_hwm(100)
    sock.connect(LOG_TYPE_PUB_PATH)
    sock.connect(LOG_SUB_PATH)
    for topic in TOPIC_LIST:
    if isinstance(topic, str):
    sock.setsockopt(zmq.SUBSCRIBE, topic)
    elif isinstance(topic, unicode):
    sock.setsockopt_string(zmq.SUBSCRIBE, topic)
    else:
    print("无法设置订阅:%s" % topic)
    sock = ZMQStream(sock, loop)
    sock.on_recv(recv_func)
    loop.start()
    if __name__ == "__main__":
    main2()

    总结

    ZMQ Pub-Sub模式通过服务端自动推送消息给客户端,极大简化了客户端的连接逻辑,适用于分布式系统中的异步通信需求。服务端和客户端的启动顺序必须注意,服务端必须先于客户端启动。通过优化代码结构和添加合理的日志处理,可以进一步提升ZMQ Pub-Sub模式的性能和可靠性。

    上一篇:python nose setuptools 快速入门
    下一篇:Go错误和异常处理

    发表评论

    最新留言

    表示我来过!
    [***.240.166.169]2025年04月28日 06时29分44秒