[源码分析] 消息队列 Kombu 之 mailbox
发布日期:2021-05-09 06:16:45 浏览次数:20 分类:精选文章

本文共 74327 字,大约阅读时间需要 247 分钟。

[源码分析] 消息队列 Kombu 之 mailbox

0x00 摘要

本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 mailbox 概念,顺便可以把之前几篇文章内容再次梳理下。

0x01 示例代码

本文实例代码来自

示例代码分为两部分˛

Node可以理解为广播Consumer。Client可以认为是广播发起者。

1.1 Node

import sysimport kombufrom kombu import pidboxhostname = "localhost"connection = kombu.Connection('redis://localhost:6379')mailbox = pidbox.Mailbox("testMailbox", type="direct")node = mailbox.Node(hostname, state={"a": "b"})node.channel = connection.channel()def callback(body, message):    print(body)    print(message)def main(arguments):    consumer = node.listen(callback=callback)    try:        while True:            print('Consumer Waiting')            connection.drain_events()    finally:        consumer.cancel()if __name__ == '__main__':    sys.exit(main(sys.argv[1:]))

1.2 client

import sysimport kombufrom kombu import pidboxdef callback():    print("callback")def main(arguments):    connection = kombu.Connection('redis://localhost:6379')    mailbox = pidbox.Mailbox("testMailbox", type="direct")    bound = mailbox(connection)    bound._broadcast("print_msg", {'msg': 'Message for you'})if __name__ == '__main__':    sys.exit(main(sys.argv[1:]))

0x02 核心思路

广播功能是利用了Redis的 pubSub 机制完成。

2.1 Redis PubSub

为了支持消息多播,Redis单独使用了一个模块来支持消息多播,也就是PubSub。

Redis作为消息发布和订阅之间的服务器,起到桥梁的作用,在Redis里面有一个channel的概念,也就是频道,发布者通过指定发布到某个频道,只要有订阅者订阅了该频道,该消息就会发送给订阅者。

消费者可以启动多个,PubSub会保证它们收到的都是相同的消息序列。

2.2 概述

在 Kombu 的 mailbox 实现中,分为 Consumer 和 Producer两部分。

Consumer 模块,在 Kombu 的 Channel 类中,当注册 listener 时候,实际就是利用了 redis 驱动的 PubSub功能,把 consumer 注册订阅到了一个 key 上。从而 Consumer 的 queue 和 回调函数 就通过 Channel 与 redis 联系起来。这样后续就可以从 Redis 读取消息。

psubscribe, client.py:3542_subscribe, redis.py:664_register_LISTEN, redis.py:322get, redis.py:375drain_events, base.py:960drain_events, connection.py:318main, node.py:24
, node.py:29

0x03 Consumer

下面我们就依据示例代码,一步一步剖析如何完成广播功能。

3.1 建立Connection

当完成以下代码之后,系统建立了Connection。

connection = kombu.Connection('redis://localhost:6379')

具体如下图,我们把问题域分为用户领域和Kombu领域两部分,以便大家理解:

user scope                 +            kombu scope                           |                           |+------------+             |            +--------------------------------------+| connection | +----------------------> | Connection: redis://localhost:6379// |+------------+             |            +--------------------------------------+                           |                           |                           |                           |                           |                           +

3.2 建立mailbox

当完成以下代码之后,系统建立了Connection和mailbox。

connection = kombu.Connection('redis://localhost:6379')mailbox = pidbox.Mailbox("testMailbox", type="fanout")

但是此时两者没有建立联系,而mailbox的某些成员变量也没有实际含义。

mailbox变量举例如下:

mailbox = {Mailbox} 
accept = {list: 1} ['json'] clock = {LamportClock} 0 connection = {NoneType} None exchange = {Exchange} Exchange testMailbox.pidbox(fanout) exchange_fmt = {str} '%s.pidbox' namespace = {str} 'testMailbox' node_cls = {type}
oid = {str} '9386a23b-ae96-3c6c-b036-ae7646455ebb' producer_pool = {NoneType} None queue_expires = {NoneType} None queue_ttl = {NoneType} None reply_exchange = {Exchange} Exchange reply.testMailbox.pidbox(direct) reply_exchange_fmt = {str} 'reply.%s.pidbox' reply_queue = {Queue}
-> 9386a23b-ae96-3c6c-b036-ae7646455ebb> reply_queue_expires = {float} 10.0 reply_queue_ttl = {NoneType} None serializer = {NoneType} None type = {str} 'fanout' unclaimed = {defaultdict: 0} defaultdict(
, {})

此时逻辑如下:

user scope        +      kombu scope                  |                  |+------------+    |       +--------------------------------------+| Connection|-----------> | Connection: redis://localhost:6379// |+------------+    |       +--------------------------------------+                  |                  |                  |                                           +----------------------------+                  |                                           | Exchange                   |                  |       +--------------------------+   +--> |                            |+---------+       |       | Mailbox                  |   |    | testMailbox.pidbox(fanout) || mailbox|--------------> |                          |   |    +----------------------------++---------+       |       |                          |   |                  |       |        exchange  +-----------+    +---------------------------------+                  |       |                          |        | Exchange                        |                  |       |        reply_exchange +-------->  |                                 |                  |       |                          |        | reply.testMailbox.pidbox(direct)|                  |       |        reply_queue +---------+    +-------------------+-------------+                  |       |                          |   |                        ^                  |       |                          |   |    +--------+          |                  |       +--------------------------+   +--> | Queue  +----------+                  |                                           +--------+                  |                  +

手机如下:

3.3 建立Node

当完成以下代码之后,系统建立了Connection,mailbox和node。

Node是mailbox中的概念,可以理解为是具体的邮箱。

connection = kombu.Connection('redis://localhost:6379')mailbox = pidbox.Mailbox("testMailbox", type="fanout")node = mailbox.Node(hostname, state={"a": "b"})

node变量举例如下:

node = {Node} 
channel = {NoneType} None handlers = {dict: 0} {} hostname = {str} 'localhost' mailbox = {Mailbox}
state = {dict: 1} {'a': 'b'}

逻辑如下图:

user scope        +                  |                  |+------------+    |       +--------------------------------------+| Connection|-----------> | Connection: redis://localhost:6379// |+------------+    |       +--------------------------------------+                  |                  |                  |                                               +----------------------------+                  |                                               | Exchange                   |                  |       +------------------------------+   +--> |                            |+---------+       |       | Mailbox                      |   |    | testMailbox.pidbox(fanout) || mailbox|--------------> |                              |   |    +----------------------------++---------+       |       |                              |   |                  |       |        exchange  +---------------+    +---------------------------------+                  |       |                              |        | Exchange                        |                  |       |        reply_exchange +------------>  |                                 |                  |       |                              |        | reply.testMailbox.pidbox(direct)|                  |       |        reply_queue +-------------+    +-------------------+-------------+                  |       |                              |   |                        ^                  |       |                              |   |    +--------+          |                  |       +-------------------------+----+   +--> | Queue  +----------+                  |                                 ^             +--------+                  |                                 |                  |       +---------------------+   |+-----+           |       |                     |   ||node | +---------------->+ Node      channel   |   |+-----+           |       |                     |   |                  |       |           mailbox +-----+                  |       |                     |                  +       +---------------------+

手机如下

3.4 建立channel

经过如下代码之后,这才建立channel。

connection = kombu.Connection('redis://localhost:6379')mailbox = pidbox.Mailbox("testMailbox", type="fanout")node = mailbox.Node(hostname, state={"a": "b"})node.channel = connection.channel()

3.4.1 联系

这里关于channel,Connection 与 Transport 的联系解释如下:

  • Connection:对 MQ 连接的抽象,一个 Connection 就对应一个 MQ 的连接;Connection 是 AMQP 对 连接的封装;
  • Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连接;Channel 是 AMQP 对 MQ 的操作的封装;
  • Transport:kombu 支持将不同的消息中间件以插件的方式进行灵活配置,使用transport这个术语来表示一个具体的消息中间件,可以认为是对broker的抽象:
    • 对 MQ 的操作必然离不开连接,但是,Kombu 并不直接让 Channel 使用 Connection 来发送/接受请求,而是引入了一个新的抽象 Transport,Transport 负责具体的 MQ 的操作,也就是说 Channel 的操作都会落到 Transport 上执行。引入transport这个抽象概念可以使得后续添加对non-AMQP的transport非常简单;
    • Transport是真实的 MQ 连接,也是真正连接到 MQ(redis/rabbitmq) 的实例,区分底层消息队列的实现;
    • 当前Kombu中build-in支持有Redis、Beanstalk、Amazon SQS、CouchDB,、MongoDB,、ZeroMQ,、ZooKeeper、SoftLayer MQ和Pyro;

3.4.2 Channel

在 Transport 有两个channels 列表:

self._avail_channelsself.channels

如果_avail_channels 有内容则直接获取,否则生成一个新的Channel。

在真正连接时候,会调用 establish_connection 放入self._avail_channels。

def establish_connection(self):    # creates channel to verify connection.    # this channel is then used as the next requested channel.    # (returned by ``create_channel``).    self._avail_channels.append(self.create_channel(self))    return self     # for drain events

Channel关键初始化代码如下:

class Channel(virtual.Channel):    """Redis Channel."""    def __init__(self, *args, **kwargs):        super().__init__(*args, **kwargs)        self._queue_cycle = cycle_by_name(self.queue_order_strategy)()        self.Client = self._get_client()        self.active_fanout_queues = set()        self.auto_delete_queues = set()        self._fanout_to_queue = {}        self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}        self.connection.cycle.add(self)  # add to channel poller. # 加入消息循环            if register_after_fork is not None:                register_after_fork(self, _after_fork_cleanup_channel)

3.4.3 MultiChannelPoller

MultiChannelPoller 定义如下,可以理解为执行engine,主要作用是:

  • 收集channel;
  • 建立fd到channel的映射;
  • 建立channel到socks的映射;
  • 使用poll;
class MultiChannelPoller:    def __init__(self):        # active channels        self._channels = set()        # file descriptor -> channel map.        self._fd_to_chan = {}        # channel -> socket map        self._chan_to_sock = {}        # poll implementation (epoll/kqueue/select)        self.poller = poll()        # one-shot callbacks called after reading from socket.        self.after_read = set()    def add(self, channel):        self._channels.add(channel)

最后Channel变量举例如下:

self = {Channel} 
Client = {type}
Message = {type}
QoS = {type}
active_fanout_queues = {set: 0} set() active_queues = {set: 0} set() async_pool = {ConnectionPool} ConnectionPool
> auto_delete_queues = {set: 0} set() body_encoding = {str} 'base64' channel_id = {int} 1 client = {Redis} Redis
>> connection = {Transport}
cycle = {FairCycle}
exchange_types = {dict: 3} {'direct':
, 'topic':
, 'fanout':
} handlers = {dict: 2} {'BRPOP':
>, 'LISTEN':
>} health_check_interval = {int} 25 keyprefix_fanout = {str} '/0.' keyprefix_queue = {str} '_kombu.binding.%s' pool = {ConnectionPool} ConnectionPool
> priority_steps = {list: 4} [0, 3, 6, 9] qos = {QoS}
queue_order_strategy = {str} 'round_robin' state = {BrokerState}
subclient = {PubSub}

最后Transport变量举例如下:

connection = {Transport} 
Channel = {type}
Cycle = {type}
Management = {type}
channels = {list: 1} [
] client = {Connection}
connection_errors = {tuple: 8} (
,
,
,
,
,
,
,
) cycle = {MultiChannelPoller}
after_read = {set: 0} set() eventflags = {int} 25 fds = {dict: 0} {} poller = {_poll}
driver_name = {str} 'redis' driver_type = {str} 'redis' implements = {Implements: 3} {'asynchronous': True, 'exchange_type': frozenset({'direct', 'fanout', 'topic'}), 'heartbeats': False} manager = {Management}
state = {BrokerState}

此时逻辑如下:

+-----------------------+    +-----------------------+                                                                      | Transport             |    | MultiChannelPoller    |user scope        +                                                   |                       |    |                       |                  |       +--------------------------------------+    |            cycle +-------> |           _channels +----+                  |       |                                      |    |                       |    +-----------------------+  |+------------+    |       | Connection: redis://localhost:6379// |    |            channels +--------+                        || Connection|-----------> |                                      |    |                       |      |                        |+------------+    |       |                                      |    |     _avail_channels+---------+                        |                  |       |                       connection+-------> |                       |      |                        |                  |       |                                      |    +-----------------------+      |                        v                  |       +--------------------------------------+                                   |      +-----------------+---+                  |                                                                                  +----->+ Channel             |     +-----------+                  |                                               +----------------------------+            |            cycle +------> | FairCycle |                  |                                               | Exchange                   |            |                     |     |           |                  |       +------------------------------+   +--> |                            |            |                     |     +-----------++---------+       |       | Mailbox                      |   |    | testMailbox.pidbox(fanout) |            |           handlers+-----+| mailbox|--------------> |                              |   |    +----------------------------+            +----+----------------+   |+---------+       |       |                              |   |                                                   ^                    |                  |       |        exchange  +---------------+    +---------------------------------+            |                    v                  |       |                              |        | Exchange                        |            |    +---------------+---------------+                  |       |        reply_exchange +------------>  |                                 |            |    |  'BRPOP': Channel._brpop_read |                  |       |                              |        | reply.testMailbox.pidbox(direct)|            |    |                               |                  |       |        reply_queue +-------------+    +-------------------+-------------+            |    |  'LISTEN': Channel._receive   |                  |       |                              |   |                        ^                          |    |                               |                  |       |                              |   |    +--------+          |                          |    +-------------------------------+                  |       +-------------------------+----+   +--> | Queue  +----------+                          |                  |                                 ^             +--------+                                     |                  |                                 |                                                            |                  |       +---------------------+   |                                                            |+-----+           |       |                     |   |                                                            ||node | +---------------->+ Node      channel+-------------------------------------------------------------------++-----+           |       |                     |   |                  |       |           mailbox +-----+                  |       |                     |                  +       +---------------------+

手机如下:

3.5 建立 Consumer

如下代码建立一个Consumer,也建立了对应的Queue。就是说,广播还是需要依赖Consumer完成,或者说是借助Consumer功能。

def main(arguments):    consumer = node.listen(callback=callback)

listen代码如下:

def listen(self, channel=None, callback=None):    consumer = self.Consumer(channel=channel,                             callbacks=[callback or self.handle_message],                             on_decode_error=self.on_decode_error)    consumer.consume()    return consumer

此时对应Queue变量如下:

queue = {Queue} 
-> > ContentDisallowed = {type}
alias = {NoneType} None auto_delete = {bool} True binding_arguments = {NoneType} None bindings = {set: 0} set() can_cache_declaration = {bool} False channel = {str} 'line 178, in _getPyDictionary\n attr = getattr(var, n)\n File " consumer_arguments = {NoneType} None durable = {bool} False exchange = {Exchange} Exchange testMailbox.pidbox(fanout)

逻辑如下:

+-----------------------+    +-----------------------+                                                                       | Transport             |    | MultiChannelPoller    | user scope        +                                                   |                       |    |                       |                   |       +--------------------------------------+    |            cycle +-------> |           _channels +----+                   |       |                                      |    |                       |    +-----------------------+  | +------------+    |       | Connection: redis://localhost:6379// |    |            channels +--------+                        | | Connection|-----------> |                                      |    |                       |      |                        | +------------+    |       |                                      |    |     _avail_channels+---------+                        |                   |       |                       connection+-------> |                       |      |                        |                   |       |                                      |    +-----------------------+      |                        v                   |       +--------------------------------------+                                   |      +-----------------+---+                   |                                                                                  +----->+ Channel             |     +-----------+                   |                                               +----------------------------+            |            cycle +------> | FairCycle |                   |                                               | Exchange                   |            |                     |     |           |                   |       +------------------------------+   +--> |                            |  <-----+   |                     |     +-----------+ +---------+       |       | Mailbox                      |   |    | testMailbox.pidbox(fanout) |        |   |           handlers+-----+ | mailbox|--------------> |                              |   |    +----------------------------+        |   +-+--+----------------+   | +---------+       |       |                              |   |                                          |     ^  ^                    |                   |       |        exchange  +---------------+    +---------------------------------+   |     |  |                    v                   |       |                              |        | Exchange                        |   |     |  |    +---------------+---------------+                   |       |        reply_exchange +------------>  |                                 |   |     |  |    |  'BRPOP': Channel._brpop_read |                   |       |                              |        | reply.testMailbox.pidbox(direct)|   |     |  |    |                               |                   |       |        reply_queue +-------------+    +-------------------+-------------+   |     |  |    |  'LISTEN': Channel._receive   |                   |       |                              |   |                        ^                 |     |  |    |                               |                   |       |                              |   |    +--------+          |                 |     |  |    +-------------------------------+                   |       +-------------------------+----+   +--> | Queue  +----------+                 |     |  |                   |                                 ^             +--------+                            |     |  |                   |                                 |                                                   |     |  |                   |       +---------------------+   |                                                   |     |  | +-----+           |       |                     |   |                                                   |     |  | |node | +---------------->+ Node      channel+-------------------------------------------------------------------+ +-----+           |       |                     |   |                                                   |     |                   |       |           mailbox +-----+                                                   |     |                   |       |                     |        +----------------------------------------------------+                   |       +---------------------+        |                                              |                   |                                      |                                              |                   |                                      |                                              |                   |       +------------------------+     |         +-----------------------------------------------------------------------------++----------+       |       |                        |     |         | Queue                              |                                        || consumer |       |       | Consumer    channel  +-------+         |                                    +                                        |+----------+       |       |                        |               |                                 exchange                                    |                   |       |             queues  +--------------->  |                                                                             |                   |       |                        |               |                                                                             |+-----------+      |       |             callbacks  |               |     
Exchange testMailbox.pidbox(fanout)> || callback | | | + | | |+------+----+ | +------------------------+ +-----------------------------------------------------------------------------+ ^ | | | | | +--------------------------------------+ | +

手机如下

3.5.1 binding 写入Redis

此时会把binding关系写入Redis,这样后续就可以利用这个binding来进行路由。

具体堆栈如下:

sadd, client.py:2243_queue_bind, redis.py:817queue_bind, base.py:568bind_to, entity.py:674queue_bind, entity.py:662_create_queue, entity.py:617declare, entity.py:606declare, messaging.py:417revive, messaging.py:404__init__, messaging.py:382Consumer, pidbox.py:78listen, pidbox.py:91main, node.py:20
, node.py:29

逻辑如图,此时出现了Redis。

user scope        +      Kombu                                        +-----------------------+    +-----------------------+                              +          redis                   |                                                   | Transport             |    | MultiChannelPoller    |                              |                   |                                                   |                       |    |                       |                              |                   |       +--------------------------------------+    |            cycle +-------> |           _channels +----+                           |                   |       |                                      |    |                       |    +-----------------------+  |                           | +------------+    |       | Connection: redis://localhost:6379// |    |            channels +--------+                        |                           | | Connection|-----------> |                                      |    |                       |      |                        |                           | +------------+    |       |                                      |    |     _avail_channels+---------+                        |                           |                   |       |                       connection+-------> |                       |      |                        |                           |                   |       |                                      |    +-----------------------+      |                        v                           |                   |       +--------------------------------------+                                   |      +-----------------+---+                       |                   |                                                                                  +----->+ Channel             |     +-----------+     |                   |                                               +----------------------------+            |            cycle +------> | FairCycle |     |                   |                                               | Exchange                   |            |                     |     |           |     |                   |       +------------------------------+   +--> |                            |  <-----+   |                     |     +-----------+     | +---------+       |       | Mailbox                      |   |    | testMailbox.pidbox(fanout) |        |   |           handlers+-----+                   | | mailbox|--------------> |                              |   |    +----------------------------+        |   +-+--+----------------+   |                   | +---------+       |       |                              |   |                                          |     ^  ^                    |                   |                   |       |        exchange  +---------------+    +---------------------------------+   |     |  |                    v                   |                   |       |                              |        | Exchange                        |   |     |  |    +---------------+---------------+   |                   |       |        reply_exchange +------------>  |                                 |   |     |  |    |  'BRPOP': Channel._brpop_read |   |                   |       |                              |        | reply.testMailbox.pidbox(direct)|   |     |  |    |                               |   |                   |       |        reply_queue +-------------+    +-------------------+-------------+   |     |  |    |  'LISTEN': Channel._receive   |   |                   |       |                              |   |                        ^                 |     |  |    |                               |   |                   |       |                              |   |    +--------+          |                 |     |  |    +-------------------------------+   |                   |       +-------------------------+----+   +--> | Queue  +----------+                 |     |  |                                        |                   |                                 ^             +--------+                            |     |  |                                        |                   |                                 |                                                   |     |  |                                        |  +----------------------------------------------------+                   |       +---------------------+   |                                                   |     |  |                                        |  |      _kombu.binding.testMailbox.pidbox             | +-----+           |       |                     |   |                                                   |     |  |                                        |  |                                                    | |node | +---------------->+ Node      channel+-------------------------------------------------------------------+                                        |  |                                                    | +-----+           |       |                     |   |                                                   |     |                                           |  |   "\x06\x16\x06\x16localhost.testMailbox.pidbox"   |                   |       |           mailbox +-----+                                                   |     |                                           |  |                                                    |                   |       |                     |        +----------------------------------------------------+                                           |  +---------+------------------------------------------+                   |       +---------------------+        |                                              |                                                 |            ^                   |                                      |                                              |                                                 |            |                   |                                      |                                              |                                                 |            |                   |       +------------------------+     |         +-----------------------------------------------------------------------------+        |            |+----------+       |       |                        |     |         | Queue                              |                                        |        |            || consumer |       |       | Consumer    channel  +-------+         |                                    +                                        |        |            |+----------+       |       |                        |               |                                 exchange                                    |        |            |                   |       |             queues  +--------------->  |                                                                             |        |            |                   |       |                        |               |                                                                             | +-------------------++-----------+      |       |             callbacks  |               |     
Exchange testMailbox.pidbox(fanout)> | || callback | | | + | | | |+------+----+ | +------------------------+ +-----------------------------------------------------------------------------+ | ^ | | | | | | | +--------------------------------------+ + | +

手机如下:

3.5.2 配置

代码来到了kombu/transport/virtual/base.py,这里工作如下:

  • 把 consumer 的 queue 加入到 Channel;
  • 把回调函数加入到 Channel;
  • 把 Consumer 加入循环;

这样,Comuser 的 queue 和 回调函数 就通过 Channel 联系起来。

代码如下:

def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):        """Consume from `queue`."""        self._tag_to_queue[consumer_tag] = queue        self._active_queues.append(queue)        def _callback(raw_message):            message = self.Message(raw_message, channel=self)            if not no_ack:                self.qos.append(message, message.delivery_tag)            return callback(message)        self.connection._callbacks[queue] = _callback        self._consumers.add(consumer_tag)        self._reset_cycle()

调用堆栈如下:

basic_consume, base.py:635basic_consume, redis.py:598consume, entity.py:738_basic_consume, messaging.py:594consume, messaging.py:473listen, pidbox.py:92main, node.py:20
, node.py:29

此时依然在Channel

self = {Channel} 
Client = {type}
Message = {type}
active_fanout_queues = {set: 1} {'localhost.testMailbox.pidbox'} active_queues = {set: 0} set() async_pool = {ConnectionPool} ConnectionPool
> auto_delete_queues = {set: 1} {'localhost.testMailbox.pidbox'} body_encoding = {str} 'base64' channel_id = {int} 1 client = {Redis} Redis
>> closed = {bool} False codecs = {dict: 1} {'base64':
} connection = {Transport}
cycle = {FairCycle}
deadletter_queue = {NoneType} None default_priority = {int} 0 do_restore = {bool} True exchange_types = {dict: 3} {'direct':
, 'topic':
, 'fanout':
} handlers = {dict: 2} {'BRPOP':
>, 'LISTEN':
>} keyprefix_fanout = {str} '/0.' keyprefix_queue = {str} '_kombu.binding.%s' pool = {ConnectionPool} ConnectionPool
> priority_steps = {list: 4} [0, 3, 6, 9] qos = {QoS}
queue_order_strategy = {str} 'round_robin' state = {BrokerState}
subclient = {PubSub}

具体循环如下:

def _reset_cycle(self):    self._cycle = FairCycle(        self._get_and_deliver, self._active_queues, Empty)

FairCycle定义如下:

class FairCycle:    """Cycle between resources.    Consume from a set of resources, where each resource gets    an equal chance to be consumed from.    Arguments:        fun (Callable): Callback to call.        resources (Sequence[Any]): List of resources.        predicate (type): Exception predicate.    """    def __init__(self, fun, resources, predicate=Exception):        self.fun = fun        self.resources = resources        self.predicate = predicate        self.pos = 0    def _next(self):        while 1:            try:                resource = self.resources[self.pos]                self.pos += 1                return resource            except IndexError:                self.pos = 0                if not self.resources:                    raise self.predicate()    def get(self, callback, **kwargs):        """Get from next resource."""        for tried in count(0):  # for infinity            resource = self._next()            try:                return self.fun(resource, callback, **kwargs)            except self.predicate:                # reraise when retries exchausted.                if tried >= len(self.resources) - 1:                    raise

回调函数如下:

fun = {method} 
>resources = {list: 1} ['localhost.testMailbox.pidbox']

逻辑如下:

user scope        +      Kombu                                        +-----------------------+    +-----------------------+                                   +          redis                   |                                                   | Transport             |    | MultiChannelPoller    |                                   |                   |                                                   |                       |    |                       |                                   |                   |       +--------------------------------------+    |            cycle +-------> |           _channels +----+                                |                   |       |                                      |    |                       |    +-----------------------+  |                                | +------------+    |       | Connection: redis://localhost:6379// |    |            channels +--------+                        v                                | | Connection|-----------> |                                      |    |                       |      |      +-----------------+---+                            | +------------+    |       |                                      |    |     _avail_channels+---------+      | Channel             |  <------------+            |                   |       |                       connection+-------> |                       |      |      |                     |               |            |                   |       |                                      |    +-----------------------+      |      |     _active_queues +------------------------+    |                   |       +--------------------------------------+                                   |      |                     |               |       |    |                   |                                                                                  +----->+                     |     +---------+-+     |    |                   |                                               +----------------------------+            |            cycle +------> | FairCycle |     |    |                   |                                               | Exchange                   |            |                     |     |           |     |    |                   |       +------------------------------+   +--> |                            |  <-----+   |                     |     +-----------+     |    | +---------+       |       | Mailbox                      |   |    | testMailbox.pidbox(fanout) |        |   |           handlers+-----+                   |    | | mailbox|--------------> |                              |   |    +----------------------------+        |   +-+--+----------------+   |                   |    | +---------+       |       |                              |   |                                          |     ^  ^                    |                   |    |                   |       |        exchange  +---------------+    +---------------------------------+   |     |  |                    v                   |    |                   |       |                              |        | Exchange                        |   |     |  |    +---------------+---------------+   |    |                   |       |        reply_exchange +------------>  |                                 |   |     |  |    |  'BRPOP': Channel._brpop_read |   |    |                   |       |                              |        | reply.testMailbox.pidbox(direct)|   |     |  |    |                               |   |    |                   |       |        reply_queue +-------------+    +-------------------+-------------+   |     |  |    |  'LISTEN': Channel._receive   |   |    |                   |       |                              |   |                        ^                 |     |  |    |                               |   |    |                   |       |                              |   |    +--------+          |                 |     |  |    +-------------------------------+   |    |                   |       +-------------------------+----+   +--> | Queue  +----------+                 |     |  |                                        |    |                   |                                 ^             +--------+                            |     |  |                                        |    |                   |                                 |                                                   |     |  |                                        |    |  +----------------------------------------------------+                   |       +---------------------+   |                                                   |     |  |                                        |    |  |      _kombu.binding.testMailbox.pidbox             | +-----+           |       |                     |   |                                                   |     |  |                                        |    |  |                                                    | |node | +---------------->+ Node      channel+-------------------------------------------------------------------+                                        |    |  |                                                    | +-----+           |       |                     |   |                                                   |     |                                           |    |  |   "\x06\x16\x06\x16localhost.testMailbox.pidbox"   |                   |       |           mailbox +-----+                                                   |     |                                           |    |  |                                                    |                   |       |                     |        +----------------------------------------------------+                                           |    |  +---------+------------------------------------------+                   |       +---------------------+        |                                              |                                                 |    |            ^                   |                                      |                                              |                                                 |    |            |                   |                                      |                                              |                                                 |    |            |                   |       +------------------------+     |         +-----------------------------------------------------------------------------+        |    |            |+----------+       |       |                        |     |         | Queue                              |                                        |        |    |            || consumer |       |       | Consumer    channel  +-------+         |                                    +                                        |  <-----+    |            |+----------+       |       |                        |               |                                 exchange                                    |             |            |                   |       |             queues  +--------------->  |                                                                             |             |            |                   |       |                        |               |                                                                             | +------------------------++-----------+      |       |             callbacks  |               |     
Exchange testMailbox.pidbox(fanout)> | || callback | | | + | | | |+------+----+ | +------------------------+ +-----------------------------------------------------------------------------+ | ^ | | | | | | | +--------------------------------------+ + |

手机如下

3.5.3 配置负载均衡

回到 Channel 类,这里最后会配置负载均衡,就是具体下一次使用哪一个 Queue 的消息。

def basic_consume(self, queue, *args, **kwargs):    if queue in self._fanout_queues:        exchange, _ = self._fanout_queues[queue]        self.active_fanout_queues.add(queue)        self._fanout_to_queue[exchange] = queue    ret = super().basic_consume(queue, *args, **kwargs)    # Update fair cycle between queues.    #    # We cycle between queues fairly to make sure that    # each queue is equally likely to be consumed from,    # so that a very busy queue will not block others.    #    # This works by using Redis's `BRPOP` command and    # by rotating the most recently used queue to the    # and of the list.  See Kombu github issue #166 for    # more discussion of this method.    self._update_queue_cycle()    return ret    def _update_queue_cycle(self):    self._queue_cycle.update(self.active_queues)

堆栈如下:

update, scheduling.py:75_update_queue_cycle, redis.py:1018basic_consume, redis.py:610consume, entity.py:738_basic_consume, messaging.py:594consume, messaging.py:473listen, pidbox.py:92main, node.py:20
, node.py:29

策略如下:

class round_robin_cycle:    """Iterator that cycles between items in round-robin."""    def __init__(self, it=None):        self.items = it if it is not None else []    def update(self, it):        """Update items from iterable."""        self.items[:] = it    def consume(self, n):        """Consume n items."""        return self.items[:n]    def rotate(self, last_used):        """Move most recently used item to end of list."""        items = self.items        try:            items.append(items.pop(items.index(last_used)))        except ValueError:            pass        return last_used

逻辑如下:

user scope        +      Kombu                                        +-----------------------+    +-----------------------+                                   +          redis                   |                                                   | Transport             |    | MultiChannelPoller    |                                   |                   |                                                   |                       |    |                       |                                   |                   |       +--------------------------------------+    |            cycle +-------> |           _channels +----+                                |                   |       |                                      |    |                       |    +-----------------------+  |                                | +------------+    |       | Connection: redis://localhost:6379// |    |            channels +--------+                        v                                | | Connection|-----------> |                                      |    |                       |      |      +-----------------+---+                            | +------------+    |       |                                      |    |     _avail_channels+---------+      | Channel             |  <------------+            |                   |       |                       connection+-------> |                       |      |      |                     |               |            |                   |       |                                      |    +-----------------------+      |      |     _active_queues +------------------------+    |                   |       +--------------------------------------+                                   |      |                     |               |       |    |                   |                                                                                  +----->+            cycle +------>  +--------+--+    |    |                   |                                               +----------------------------+            |                     |      | FairCycle |    |    |                   |                                               | Exchange                   |            |                     |      +-----------+    |    |                   |       +------------------------------+   +--> |                            |  <-----+   |       _queue_cycle+-----------+             |    | +---------+       |       | Mailbox                      |   |    | testMailbox.pidbox(fanout) |        |   |                     |         |             |    | | mailbox|--------------> |                              |   |    +----------------------------+        |   |           handlers  |         v             |    | +---------+       |       |                              |   |                                          |   |               +     |      round_robin_cycle|    |                   |       |        exchange  +---------------+    +---------------------------------+   |   +-+--+----------------+                       |    |                   |       |                              |        | Exchange                        |   |     ^  ^          |                             |    |                   |       |        reply_exchange +------------>  |                                 |   |     |  |          |                             |    |                   |       |                              |        | reply.testMailbox.pidbox(direct)|   |     |  |          |                             |    |                   |       |        reply_queue +-------------+    +-------------------+-------------+   |     |  |          v                             |    |                   |       |                              |   |                        ^                 |     |  |    +-----+-------------------------+   |    |                   |       |                              |   |    +--------+          |                 |     |  |    |  'BRPOP': Channel._brpop_read |   |    |                   |       +-------------------------+----+   +--> | Queue  +----------+                 |     |  |    |                               |   |    |                   |                                 ^             +--------+                            |     |  |    |  'LISTEN': Channel._receive   |   |    |                   |                                 |                                                   |     |  |    |                               |   |    |  +----------------------------------------------------+                   |       +---------------------+   |                                                   |     |  |    +-------------------------------+   |    |  |      _kombu.binding.testMailbox.pidbox             | +-----+           |       |                     |   |                                                   |     |  |                                        |    |  |                                                    | |node | +---------------->+ Node      channel+-------------------------------------------------------------------+                                        |    |  |                                                    | +-----+           |       |                     |   |                                                   |     |                                           |    |  |   "\x06\x16\x06\x16localhost.testMailbox.pidbox"   |                   |       |           mailbox +-----+                                                   |     |                                           |    |  |                                                    |                   |       |                     |        +----------------------------------------------------+                                           |    |  +---------+------------------------------------------+                   |       +---------------------+        |                                              |                                                 |    |            ^                   |                                      |                                              |                                                 |    |            |                   |                                      |                                              |                                                 |    |            |                   |       +------------------------+     |         +-----------------------------------------------------------------------------+        |    |            |+----------+       |       |                        |     |         | Queue                              |                                        |        |    |            || consumer |       |       | Consumer    channel  +-------+         |                                    +                                        |  <-----+    |            |+----------+       |       |                        |               |                                 exchange                                    |             |            |                   |       |             queues  +--------------->  |                                                                             |             |            |                   |       |                        |               |                                                                             | +------------------------++-----------+      |       |             callbacks  |               |     
Exchange testMailbox.pidbox(fanout)> | || callback | | | + | | | |+------+----+ | +------------------------+ +-----------------------------------------------------------------------------+ | ^ | | | | | | | +--------------------------------------+ + |

手机如下:

3.6 消费

3.2.1 消费主体

如下代码完成消费。

def main(arguments):    consumer = node.listen(callback=callback)    try:        while True:            print('Consumer Waiting')            connection.drain_events()    finally:        consumer.cancel()

具体就是使用 drain_events 里读取消息,其代码如下:

def drain_events(self, connection, timeout=None):    time_start = monotonic()    get = self.cycle.get    polling_interval = self.polling_interval    if timeout and polling_interval and polling_interval > timeout:        polling_interval = timeout    while 1:        try:            get(self._deliver, timeout=timeout)        except Empty:            if timeout is not None and monotonic() - time_start >= timeout:                raise socket.timeout()            if polling_interval is not None:                sleep(polling_interval)        else:            break

3.2.2 业务逻辑

3.2.2.1 注册

get方法功能如下(需要注意的是,每次消费都要使用一次get函数,即,都要进行注册,消费....):

  • 注册响应方式;
  • 进行poll操作,这是通用操作,或者 BRPOP,或者 LISTEN;
  • 调用 handle_event 进行读取redis,具体消费;
def get(self, callback, timeout=None):    self._in_protected_read = True    try:        for channel in self._channels:            if channel.active_queues:           # BRPOP mode?                if channel.qos.can_consume():                    self._register_BRPOP(channel)            if channel.active_fanout_queues:    # LISTEN mode?                self._register_LISTEN(channel)        events = self.poller.poll(timeout)        if events:            for fileno, event in events:                ret = self.handle_event(fileno, event) # 具体读取redis,进行消费                if ret:                    return        # - no new data, so try to restore messages.        # - reset active redis commands.        self.maybe_restore_messages()        raise Empty()    finally:        self._in_protected_read = False        while self.after_read:            try:                fun = self.after_read.pop()            except KeyError:                break            else:                fun()

因为这里利用了pubsub,所以调用到 channel._subscribe 来注册订阅,具体如下:

def _register_LISTEN(self, channel):    """Enable LISTEN mode for channel."""    if not self._client_registered(channel, channel.subclient, 'LISTEN'):        channel._in_listen = False        self._register(channel, channel.subclient, 'LISTEN')    if not channel._in_listen:        channel._subscribe()  # send SUBSCRIBE

具体类如下:

self = {MultiChannelPoller} 

_register会把channel,socket fd的信息结合起来,作用就是:如果对应的socket fd有poll,就会调用对应的channel。

def _register(self, channel, client, type):    if (channel, client, type) in self._chan_to_sock:        self._unregister(channel, client, type)    if client.connection._sock is None:   # not connected yet.        client.connection.connect()    sock = client.connection._sock    self._fd_to_chan[sock.fileno()] = (channel, type)    self._chan_to_sock[(channel, client, type)] = sock    self.poller.register(sock, self.eventflags)

具体_subscribe就是与具体redis联系,进行注册。

这样,对于 consumer 来说,redis 也联系上了,poll 也联系上了,下面就可以消费了。

def _subscribe(self):    keys = [self._get_subscribe_topic(queue)            for queue in self.active_fanout_queues]    if not keys:        return    c = self.subclient    if c.connection._sock is None:        c.connection.connect()    self._in_listen = c.connection    c.psubscribe(keys)

堆栈如下:

_subscribe, redis.py:663_register_LISTEN, redis.py:322get, redis.py:375drain_events, base.py:960drain_events, connection.py:318main, node.py:24
, node.py:29

相应变量如下,这里 client 是 redis 驱动的 PubSub 对象:

c = {PubSub} 
keys = {list: 1} ['/0.testMailbox.pidbox']self = {Channel}

此时逻辑如下:

+user scope         +      Kombu                                                                                                                                 |          redis                   |                                                                               psubscribe                                                   |                   |                                                                                                                                            |           +----------------------------+        +--------------------->   drain_events   +--------------------------------------------------------------------------------------------------------------------->    |  '/0.testMailbox.pidbox'   |        |          |                                                                                                                                            |           +----------------------------+        |          |                                                                                                                                            |        |          |                                                   +-----------------------+    +-----------------------+                                   |        |          |                                                   | Transport             |    | MultiChannelPoller    |                                   |        |          |                                                   |                       |    |                       |                                   |        |          |       +--------------------------------------+    |            cycle +-------> |           _channels +----+                                |        |          |       |                                      |    |                       |    +-----------------------+  |                                | +------+-----+    |       | Connection: redis://localhost:6379// |    |            channels +--------+                        v                                | | Connection|-----------> |                                      |    |                       |      |      +-----------------+---+                            | +------------+    |       |                                      |    |     _avail_channels+---------+      | Channel             |  <------------+            |                   |       |                       connection+-------> |                       |      |      |                     |               |            |                   |       |                                      |    +-----------------------+      |      |     _active_queues +------------------------+    |                   |       +--------------------------------------+                                   |      |                     |               |       |    |                   |                                                                                  +----->+            cycle +------>  +--------+--+    |    |                   |                                               +----------------------------+            |                     |      | FairCycle |    |    |                   |                                               | Exchange                   |            |                     |      +-----------+    |    |                   |       +------------------------------+   +--> |                            |  <-----+   |       _queue_cycle+-----------+             |    | +---------+       |       | Mailbox                      |   |    | testMailbox.pidbox(fanout) |        |   |                     |         |             |    | | mailbox|--------------> |                              |   |    +----------------------------+        |   |           handlers  |         v             |    | +---------+       |       |                              |   |                                          |   |               +     |      round_robin_cycle|    |                   |       |        exchange  +---------------+    +---------------------------------+   |   +-+--+----------------+                       |    |                   |       |                              |        | Exchange                        |   |     ^  ^          |                             |    |                   |       |        reply_exchange +------------>  |                                 |   |     |  |          |                             |    |                   |       |                              |        | reply.testMailbox.pidbox(direct)|   |     |  |          |                             |    |                   |       |        reply_queue +-------------+    +-------------------+-------------+   |     |  |          v                             |    |                   |       |                              |   |                        ^                 |     |  |    +-----+-------------------------+   |    |                   |       |                              |   |    +--------+          |                 |     |  |    |  'BRPOP': Channel._brpop_read |   |    |                   |       +-------------------------+----+   +--> | Queue  +----------+                 |     |  |    |                               |   |    |                   |                                 ^             +--------+                            |     |  |    |  'LISTEN': Channel._receive   |   |    |                   |                                 |                                                   |     |  |    |                               |   |    |  +----------------------------------------------------+                   |       +---------------------+   |                                                   |     |  |    +-------------------------------+   |    |  |      _kombu.binding.testMailbox.pidbox             | +-----+           |       |                     |   |                                                   |     |  |                                        |    |  |                                                    | |node | +---------------->+ Node      channel+-------------------------------------------------------------------+                                        |    |  |                                                    | +-----+           |       |                     |   |                                                   |     |                                           |    |  |   "\x06\x16\x06\x16localhost.testMailbox.pidbox"   |                   |       |           mailbox +-----+                                                   |     |                                           |    |  |                                                    |                   |       |                     |        +----------------------------------------------------+                                           |    |  +---------+------------------------------------------+                   |       +---------------------+        |                                              |                                                 |    |            ^                   |                                      |                                              |                                                 |    |            |                   |                                      |                                              |                                                 |    |            |                   |       +------------------------+     |         +-----------------------------------------------------------------------------+        |    |            |+----------+       |       |                        |     |         | Queue                              |                                        |        |    |            || consumer |       |       | Consumer    channel  +-------+         |                                    +                                        |  <-----+    |            |+----------+       |       |                        |               |                                 exchange                                    |             |            |                   |       |             queues  +--------------->  |                                                                             |             |            |                   |       |                        |               |                                                                             | +------------------------++-----------+      |       |             callbacks  |               |     
Exchange testMailbox.pidbox(fanout)> | || callback | | | + | | | |+------+----+ | +------------------------+ +-----------------------------------------------------------------------------+ | ^ | | | | | | | +--------------------------------------+ + |

手机如下

3.2.2.2 消费

前小节提到了,handle_event 之中会具体读取redis,进行消费。

当接受到信息之后,会调用如下:

def _deliver(self, message, queue):    try:        callback = self._callbacks[queue]    except KeyError:        self._reject_inbound_message(message)    else:        callback(message)

堆栈如下:

_deliver, base.py:975_receive_one, redis.py:721_receive, redis.py:692on_readable, redis.py:358handle_event, redis.py:362get, redis.py:380drain_events, base.py:960drain_events, connection.py:318main, node.py:24
, node.py:29

此时变量如下,就是 basic_consume 之中的 _callback :

self._callbacks = {dict: 1}  'localhost.testMailbox.pidbox' = {function} 
._callback at 0x7fc2522c1840>

继续调用,处理信息

def receive(self, body, message):    """Method called when a message is received.    This dispatches to the registered :attr:`callbacks`.    Arguments:        body (Any): The decoded message body.        message (~kombu.Message): The message instance.    Raises:        NotImplementedError: If no consumer callbacks have been            registered.    """    callbacks = self.callbacks    [callback(body, message) for callback in callbacks]

堆栈如下:

receive, messaging.py:583_receive_callback, messaging.py:620_callback, base.py:630_deliver, base.py:980_receive_one, redis.py:721_receive, redis.py:692on_readable, redis.py:358handle_event, redis.py:362get, redis.py:380drain_events, base.py:960drain_events, connection.py:318main, node.py:24
, node.py:29

变量如下:

body = {dict: 5} {'method': 'print_msg', 'arguments': {'msg': 'Message for you'}, 'destination': None, 'pattern': None, 'matcher': None}message = {Message} 
-> bound to chan:1>]>

最后调用到用户方法:

callback, node.py:15
, messaging.py:586receive, messaging.py:586_receive_callback, messaging.py:620_callback, base.py:630_deliver, base.py:980_receive_one, redis.py:721_receive, redis.py:692on_readable, redis.py:358handle_event, redis.py:362get, redis.py:380drain_events, base.py:960drain_events, connection.py:318main, node.py:24
, node.py:29

这样,mailbox 的 consumer 端就分析完毕。

0x04 Producer

Producer 就是发送邮件,此处逻辑要简单许多。

代码如下:

def main(arguments):    connection = kombu.Connection('redis://localhost:6379')    mailbox = pidbox.Mailbox("testMailbox", type="fanout")    bound = mailbox(connection)    bound._broadcast("print_msg", {'msg': 'Message for you'})

4.1 Mailbox

现在位于Mailbox,可以看到就是调用 _publish。

def _broadcast(self, command, arguments=None, destination=None,               reply=False, timeout=1, limit=None,               callback=None, channel=None, serializer=None,               pattern=None, matcher=None):    arguments = arguments or {}    reply_ticket = reply and uuid() or None    chan = channel or self.connection.default_channel    # Set reply limit to number of destinations (if specified)    if limit is None and destination:        limit = destination and len(destination) or None    serializer = serializer or self.serializer    self._publish(command, arguments, destination=destination,                  reply_ticket=reply_ticket,                  channel=chan,                  timeout=timeout,                  serializer=serializer,                  pattern=pattern,                  matcher=matcher)    if reply_ticket:        return self._collect(reply_ticket, limit=limit,                             timeout=timeout,                             callback=callback,                             channel=chan)

变量如下:

arguments = {dict: 1} {'msg': 'Message for you'}self = {Mailbox} 

继续调用 _publish,其中如果需要回复,则做相应设置,否则直接调用 producer 进行发送。

def _publish(self, type, arguments, destination=None,             reply_ticket=None, channel=None, timeout=None,             serializer=None, producer=None, pattern=None, matcher=None):    message = {'method': type,               'arguments': arguments,               'destination': destination,               'pattern': pattern,               'matcher': matcher}    chan = channel or self.connection.default_channel    exchange = self.exchange    if reply_ticket:        maybe_declare(self.reply_queue(channel))        message.update(ticket=reply_ticket,                       reply_to={'exchange': self.reply_exchange.name,                                 'routing_key': self.oid})    serializer = serializer or self.serializer    with self.producer_or_acquire(producer, chan) as producer:        producer.publish(            message, exchange=exchange.name, declare=[exchange],            headers={'clock': self.clock.forward(),                     'expires': time() + timeout if timeout else 0},            serializer=serializer, retry=True,        )

此时变量如下:

exchange = {Exchange} Exchange testMailbox.pidbox(fanout)message = {dict: 5} {'method': 'print_msg', 'arguments': {'msg': 'Message for you'}, 'destination': None, 'pattern': None, 'matcher': None}

4.2 producer

下面产生了producer。于是由producer进行操作。

def _publish(self, body, priority, content_type, content_encoding,             headers, properties, routing_key, mandatory,             immediate, exchange, declare):    channel = self.channel    message = channel.prepare_message(        body, priority, content_type,        content_encoding, headers, properties,    )    # handle autogenerated queue names for reply_to    reply_to = properties.get('reply_to')    if isinstance(reply_to, Queue):        properties['reply_to'] = reply_to.name    return channel.basic_publish(        message,        exchange=exchange, routing_key=routing_key,        mandatory=mandatory, immediate=immediate,    )

4.3 Channel

继续执行到 Channel,就是要对 redis 进行处理了。

def basic_publish(self, message, exchange, routing_key, **kwargs):    """Publish message."""    self._inplace_augment_message(message, exchange, routing_key)    if exchange:        return self.typeof(exchange).deliver(  # 这里            message, exchange, routing_key, **kwargs        )    # anon exchange: routing_key is the destination queue    return self._put(routing_key, message, **kwargs)

4.4 FanoutExchange

直接用 Exchange 进行发送。

class FanoutExchange(ExchangeType):    """Fanout exchange.    The `fanout` exchange implements broadcast messaging by delivering    copies of all messages to all queues bound to the exchange.    To support fanout the virtual channel needs to store the table    as shared state.  This requires that the `Channel.supports_fanout`    attribute is set to true, and the `Channel._queue_bind` and    `Channel.get_table` methods are implemented.    """    type = 'fanout'    def lookup(self, table, exchange, routing_key, default):        return {queue for _, _, queue in table}    def deliver(self, message, exchange, routing_key, **kwargs):        if self.channel.supports_fanout:            self.channel._put_fanout(                exchange, message, routing_key, **kwargs)

4.5 Channel

流程进入到 Channel,这时候调用 redis 驱动进行发送。

def _put_fanout(self, exchange, message, routing_key, **kwargs):    """Deliver fanout message."""    with self.conn_or_acquire() as client:        client.publish(            self._get_publish_topic(exchange, routing_key),            dumps(message),        )

4.6 redis 驱动

最后,redis 驱动进行发送。

def publish(self, channel, message):    """    Publish ``message`` on ``channel``.    Returns the number of subscribers the message was delivered to.    """    return self.execute_command('PUBLISH', channel, message)

关键变量如下:

channel = {str} '/0.testMailbox.pidbox'message = {str} '{"body": "eyJtZXRob2QiOiAicHJpbnRfbXNnIiwgImFyZ3VtZW50cyI6IHsibXNnIjogIk1lc3NhZ2UgZm9yIHlvdSJ9LCAiZGVzdGluYXRpb24iOiBudWxsLCAicGF0dGVybiI6IG51bGwsICJtYXRjaGVyIjogbnVsbH0=", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"clocself = {Redis} Redis
>>

0xFF 参考

上一篇:用水浒传来学习OKR
下一篇:[源码分析] 消息队列 Kombu 之 Hub

发表评论

最新留言

逛到本站,mark一下
[***.202.152.39]2025年05月13日 07时03分52秒