[源码分析] 消息队列 Kombu 之 启动过程
发布日期:2021-05-09 06:16:43 浏览次数:14 分类:精选文章

本文共 36445 字,大约阅读时间需要 121 分钟。

[源码分析] 消息队列 Kombu 之 启动过程

0x00 摘要

本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 是如何启动,以及如何搭建一个基本的架子。

因为之前有一个综述,所以大家会发现,一些概念讲解文字会同时出现在后续文章和综述之中。

0x01 示例

下面使用如下代码来进行说明。

本示例来自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感谢。

def main(arguments):    hub = Hub()    exchange = Exchange('asynt_exchange')    queue = Queue('asynt_queue', exchange, 'asynt_routing_key')    def send_message(conn):        producer = Producer(conn)        producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')        print('message sent')    def on_message(message):        print('received: {0!r}'.format(message.body))        message.ack()        # hub.stop()  # <-- exit after one message    conn = Connection('redis://localhost:6379')    conn.register_with_event_loop(hub)    def p_message():        print(' kombu ')    with Consumer(conn, [queue], on_message=on_message):        send_message(conn)        hub.timer.call_repeatedly(3, p_message)        hub.run_forever()if __name__ == '__main__':    sys.exit(main(sys.argv[1:]))

0x02 启动

让我们顺着程序流程看看Kombu都做了些什么,也可以对 Kombu 内部有所了解。

本文关注的重点是:Connection,Channel 和 Hub 是如何联系在一起的

2.1 Hub

在程序开始,我们建立了Hub。

Hub的作用是建立消息Loop,但是此时尚未建立,因此只是一个静态实例。

hub = Hub()

其定义如下:

class Hub:    """Event loop object.    Arguments:        timer (kombu.asynchronous.Timer): Specify custom timer instance.    """    def __init__(self, timer=None):        self.timer = timer if timer is not None else Timer()        self.readers = {}        self.writers = {}        self.on_tick = set()        self.on_close = set()        self._ready = set()        self._running = False        self._loop = None        self.consolidate = set()        self.consolidate_callback = None        self.propagate_errors = ()        self._create_poller()

因为此时没有建立loop,所以目前重要的步骤是建立Poll,其Stack如下:

_get_poller, eventio.py:321poll, eventio.py:328_create_poller, hub.py:113__init__, hub.py:96main, testUb.py:22
, testUb.py:55

在eventio.py中有如下,我们可以看到Kombu可以使用多种模型来进行内核消息处理:

def _get_poller():    if detect_environment() != 'default':        # greenlet        return _select    elif epoll:        # Py2.6+ Linux        return _epoll    elif kqueue and 'netbsd' in sys.platform:        return _kqueue    elif xpoll:        return _poll    else:        return _select

因为本机情况,这里选择的是:_poll。

+------------------+| Hub              ||                  ||                  |            +-------------+|      poller +---------------> | _poll       ||                  |            |             |         +-------+|                  |            |    _poller+---------> |  poll |+------------------+            |             |         +-------+                                +-------------+

2.2 Exchange与Queue

其次建立了Exchange与Queue。

  • Exchange:交换机,消息发送者将消息发至 Exchange,Exchange 负责将消息分发至 Queue;
  • Queue:消息队列,存储着即将被应用消费掉的消息,Exchange 负责将消息分发 Queue,消费者从 Queue 接收消息;

因为此时也没有具体消息,所以我们暂且无法探究Exchange机制。

exchange = Exchange('asynt')queue = Queue('asynt', exchange, 'asynt')

此时将把Exchange与Queue联系起来。图示如下:

+------------------+| Hub              ||                  ||                  |            +-------------+|      poller +---------------> | _poll       ||                  |            |             |         +-------+|                  |            |    _poller+---------> |  poll |+------------------+            |             |         +-------+                                +-------------++----------------+         +-------------------+| Exchange       |         | Queue             ||                |         |                   ||                |         |                   ||     channel    | <------------+ exchange     ||                |         |                   ||                |         |                   |+----------------+         +-------------------+

2.3 Connection

第三步就是建立Connection。

Connection是对 MQ 连接的抽象,一个 Connection 就对应一个 MQ 的连接。现在就是对'redis://localhost:6379'连接进行抽象。

conn = Connection('redis://localhost:6379')

2.3.1 定义

由定义注释可知,Connection是到broker的连接。从具体代码可以看出,Connection更接近是一个逻辑概念,具体功能都委托给别人完成。

消息从来不直接发送给队列,甚至 Producers 都可能不知道队列的存在。 Producer如何才能将消息发送给Consumer呢?这中间需要经过 Message Broker 的处理和传递。

AMQP中,承担 Message Broker 功能的就是 AMQP Server。也正是从这个角度讲,AMQP 的 Producer 和Consumer 都是 AMQP Client。

在Kombu 体系中,用 transport 对所有的 broker 进行了抽象,为不同的 broker 提供了一致的解决方案。通过Kombu,开发者可以根据实际需求灵活的选择或更换broker

Connection主要成员变量是,但是此时没有赋值:

  • _connection:
  • _transport:就是上面提到的对 broker 的抽象。
  • cycle:与broker交互的调度策略。
  • failover_strategy:在连接失效时,选取其他hosts的策略。
  • heartbeat:用来实施心跳。

代码如下:

class Connection:    """A connection to the broker"""    port = None    virtual_host = '/'    connect_timeout = 5    _connection = None    _default_channel = None    _transport = None    uri_prefix = None    #: The cache of declared entities is per connection,    #: in case the server loses data.    declared_entities = None    #: Iterator returning the next broker URL to try in the event    #: of connection failure (initialized by :attr:`failover_strategy`).    cycle = None    #: Additional transport specific options,    #: passed on to the transport instance.    transport_options = None    #: Strategy used to select new hosts when reconnecting after connection    #: failure.  One of "round-robin", "shuffle" or any custom iterator    #: constantly yielding new URLs to try.    failover_strategy = 'round-robin'    #: Heartbeat value, currently only supported by the py-amqp transport.    heartbeat = None    resolve_aliases = resolve_aliases    failover_strategies = failover_strategies    hostname = userid = password = ssl = login_method = None

2.3.2 init 与 transport

Connection内部主要任务是建立了transport。

Stack大致如下:

Transport, redis.py:1039
, redis.py:1031import_module, __init__.py:126symbol_by_name, imports.py:56resolve_transport, __init__.py:70get_transport_cls, __init__.py:85__init__, connection.py:183main, testUb.py:40
, testUb.py:55

2.4 Transport

在Kombu体系中,用transport对所有的broker进行了抽象,为不同的broker提供了一致的解决方案。通过Kombu,开发者可以根据实际需求灵活的选择或更换broker。

Transport:真实的 MQ 连接,也是真正连接到 MQ(redis/rabbitmq) 的实例。就是存储和发送消息的实体,用来区分底层消息队列是用amqp、Redis还是其它实现的。

Transport负责具体操作,但是很多操作移交给 loop 与 MultiChannelPoller 进行。

2.4.1 定义

其主要成员变量为:

  • 本transport的驱动类型,名字;
  • 对应的 Channel;
  • cycle:MultiChannelPoller,具体下文提到;

定义如下:

class Transport(virtual.Transport):    """Redis Transport."""    Channel = Channel    polling_interval = None  # disable sleep between unsuccessful polls.    default_port = DEFAULT_PORT    driver_type = 'redis'    driver_name = 'redis'    implements = virtual.Transport.implements.extend(        asynchronous=True,        exchange_type=frozenset(['direct', 'topic', 'fanout'])    )    def __init__(self, *args, **kwargs):        if redis is None:            raise ImportError('Missing redis library (pip install redis)')        super().__init__(*args, **kwargs)        # Get redis-py exceptions.        self.connection_errors, self.channel_errors = self._get_errors()        # All channels share the same poller.        self.cycle = MultiChannelPoller()

2.4.2 移交操作

Transport负责具体操作,但是很多操作移交给 loop 与 MultiChannelPoller 进行,具体从下面代码可见。

def register_with_event_loop(self, connection, loop):    cycle = self.cycle    cycle.on_poll_init(loop.poller)    cycle_poll_start = cycle.on_poll_start    add_reader = loop.add_reader    on_readable = self.on_readable    def _on_disconnect(connection):        if connection._sock:            loop.remove(connection._sock)    cycle._on_connection_disconnect = _on_disconnect    def on_poll_start():        cycle_poll_start()        [add_reader(fd, on_readable, fd) for fd in cycle.fds]            loop.on_tick.add(on_poll_start)    loop.call_repeatedly(10, cycle.maybe_restore_messages)        health_check_interval = connection.client.transport_options.get(        'health_check_interval',        DEFAULT_HEALTH_CHECK_INTERVAL    )        loop.call_repeatedly(        health_check_interval,        cycle.maybe_check_subclient_health    )

其中重点是MultiChannelPoller。一个Connection有一个Transport, 一个Transport有一个MultiChannelPoller,对poll操作都是由MultiChannelPoller完成,redis操作由channel完成

2.4.3 MultiChannelPoller

定义如下,可以理解为执行engine,主要作用是:

  • 收集channel;
  • 建立fd到channel的映射;
  • 建立channel到socks的映射;
  • 使用poll;
class MultiChannelPoller:    """Async I/O poller for Redis transport."""    eventflags = READ | ERR    def __init__(self):        # active channels        self._channels = set()        # file descriptor -> channel map.        self._fd_to_chan = {}        # channel -> socket map        self._chan_to_sock = {}        # poll implementation (epoll/kqueue/select)        self.poller = poll()        # one-shot callbacks called after reading from socket.        self.after_read = set()

2.4.4 获取

Transport是预先生成的,若需要,则依据名字取得。

TRANSPORT_ALIASES = {    'amqp': 'kombu.transport.pyamqp:Transport',    'amqps': 'kombu.transport.pyamqp:SSLTransport',    'pyamqp': 'kombu.transport.pyamqp:Transport',    'librabbitmq': 'kombu.transport.librabbitmq:Transport',    'memory': 'kombu.transport.memory:Transport',    'redis': 'kombu.transport.redis:Transport',	......    'pyro': 'kombu.transport.pyro:Transport'}_transport_cache = {}def resolve_transport(transport=None):    """Get transport by name. """    if isinstance(transport, str):        try:            transport = TRANSPORT_ALIASES[transport]        except KeyError:            if '.' not in transport and ':' not in transport:                from kombu.utils.text import fmatch_best                alt = fmatch_best(transport, TRANSPORT_ALIASES)        else:            if callable(transport):                transport = transport()        return symbol_by_name(transport)    return transportdef get_transport_cls(transport=None):    """Get transport class by name.    """    if transport not in _transport_cache:        _transport_cache[transport] = resolve_transport(transport)    return _transport_cache[transport]

此时Connection数据如下,注意其部分成员变量尚且没有意义:

conn = {Connection} 
alt = {list: 0} [] connect_timeout = {int} 5 connection = {Transport}
cycle = {NoneType} None declared_entities = {set: 0} set() default_channel = {Channel}
failover_strategies = {dict: 2} {'round-robin':
, 'shuffle':
} failover_strategy = {type}
heartbeat = {int} 0 host = {str} 'localhost:6379' hostname = {str} 'localhost' manager = {Management}
port = {int} 6379 recoverable_channel_errors = {tuple: 0} () resolve_aliases = {dict: 2} {'pyamqp': 'amqp', 'librabbitmq': 'amqp'} transport = {Transport}
transport_cls = {str} 'redis' uri_prefix = {NoneType} None userid = {NoneType} None virtual_host = {str} '/'

至此,Kombu的基本就建立完成,但是彼此之间没有建立逻辑联系

所以此时示例如下,注意此时三者没有联系:

+-------------------+       +---------------------+       +--------------------+| Connection        |       | redis.Transport     |       | MultiChannelPoller ||                   |       |                     |       |                    ||                   |       |                     |       |     _channels      ||                   |       |        cycle +------------> |     _fd_to_chan    ||     transport +---------> |                     |       |     _chan_to_sock  ||                   |       |                     |       |     poller         |+-------------------+       +---------------------+       |     after_read     |                                                          |                    |                                                          +--------------------++------------------+| Hub              ||                  ||                  |            +-------------+|      poller +---------------> | _poll       ||                  |            |             |         +-------+|                  |            |    _poller+---------> |  poll |+------------------+            |             |         +-------+                                +-------------++----------------+         +-------------------+| Exchange       |         | Queue             ||                |         |                   ||                |         |                   ||     channel    | <------------+ exchange     ||                |         |                   ||                |         |                   |+----------------+         +-------------------+

0x03 Connection注册hub

之前我们提到,基本架子已经建立起来,但是各个模块之间彼此没有联系,下面我们就看看如何建立联系

示例代码来到:

conn.register_with_event_loop(hub)

这里进行了注册,此时作用是把hub与Connection联系起来。随之调用到:

def register_with_event_loop(self, loop):    self.transport.register_with_event_loop(self.connection, loop)

进而调用到transport类:<kombu.transport.redis.Transport object at 0x7fd23e962dd8>

具体代码如下:

def register_with_event_loop(self, connection, loop):    cycle = self.cycle    cycle.on_poll_init(loop.poller)# 这里建立联系,loop就是hub    cycle_poll_start = cycle.on_poll_start    add_reader = loop.add_reader    on_readable = self.on_readable    def _on_disconnect(connection):        if connection._sock:            loop.remove(connection._sock)    cycle._on_connection_disconnect = _on_disconnect    def on_poll_start():        cycle_poll_start()        [add_reader(fd, on_readable, fd) for fd in cycle.fds]            loop.on_tick.add(on_poll_start)    loop.call_repeatedly(10, cycle.maybe_restore_messages)        health_check_interval = connection.client.transport_options.get(        'health_check_interval',        DEFAULT_HEALTH_CHECK_INTERVAL    )        loop.call_repeatedly(        health_check_interval,        cycle.maybe_check_subclient_health    )

3.1 建立Channel

注册最初是建立Channel。这里有一个连接的动作,就是在这里,建立了Channel。

@propertydef connection(self):    """The underlying connection object"""    if not self._closed:        if not self.connected:            return self._ensure_connection(                max_retries=1, reraise_as_library_errors=False            )        return self._connection

具体建立是在 base.py 中完成,这是 Transport 基类。Stack 如下:

create_channel, base.py:920establish_connection, base.py:938_establish_connection, connection.py:801_connection_factory, connection.py:866retry_over_time, functional.py:325_ensure_connection, connection.py:439connection, connection.py:859register_with_event_loop, connection.py:266main, testUb.py:41
, testUb.py:55

3.2 Channel

Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连接。就是真正的连接

可以认为是 redis 操作和连接的封装。每个 Channel 都可以与 redis 建立一个连接,在此连接之上对 redis 进行操作,每个连接都有一个 socket,每个 socket 都有一个 file,从这个 file 可以进行 poll

为了更好的说明,我们提前给出这个通讯流程大约如下:

+---------------------------------------------------------------------------------------------------------------------------------------+            |                                     +--------------+                                   6                       parse_response         |            |                                +--> | Linux Kernel | +---+                                                                            |            |                                |    +--------------+     |                                                                            |            |                                |                         |                                                                            |            |                                |                         |  event                                                                     |            |                                |  1                      |                                                                            |            |                                |                         |  2                                                                         |            |                                |                         |                                                                            |    +-------+---+    socket                  +                         |                                                                            |    |   redis   | <------------> port +-->  fd +--->+                  v                                                                            |    |           |                                   |           +------+--------+                                                                   |    |           |    socket                         |           |  Hub          |                                                                   |    |           | <------------> port +-->  fd +--->----------> |               |                                                                   |    | port=6379 |                                   |           |               |                                                                   |    |           |    socket                         |           |     readers +----->  Transport.on_readable                                        |    |           | <------------> port +-->  fd +--->+           |               |                     +                                             |    +-----------+                                               +---------------+                     |                                             |                                                                                                      |                                             |                                                        3                                             |                                             |             +----------------------------------------------------------------------------------------+                                             |             |                                                                                                                                      v             |                                                                                                                                                  _receive_callback             |                                                                                                                            5    +-------------+                      +-----------++------------+------+                     +-------------------------+                                    'BRPOP' = Channel._brpop_read +-----> | Channel     | +------------------> | Consumer  ||       Transport   |                     |  MultiChannelPoller     |      +------>  channel . handlers  'LISTEN' = Channel._receive           +-------------+                      +---+-------+|                   |                     |                         |      |                                                                                           8                ||                   | on_readable(fileno) |                         |      |                                                                         ^                                  ||           cycle +---------------------> |          _fd_to_chan +---------------->  channel . handlers  'BRPOP' = Channel._brpop_read               |                                  ||                   |        4            |                         |      |                             'LISTEN' = Channel._receive                 |                                  ||  _callbacks[queue]|                     |                         |      |                                                                         |                            on_m  |  9|          +        |                     +-------------------------+      +------>  channel . handlers  'BRPOP' = Channel._brpop_read               |                                  |+-------------------+                                                                                    'LISTEN' = Channel._receive                 |                                  |           |                                                                                                                                         |                                  v           |                                                7           _callback                                                                    |           +-----------------------------------------------------------------------------------------------------------------------------------------+                            User Function

手机上如下:

3.2.1 定义

Channel 主要成员是:

  • async_pool :redis异步连接池;
  • pool :redis连接池;
  • channel_id :Channel ID;
  • client :就是StrictRedis之类的driver;
  • connection :对应的Transport;
  • cycle = {FairCycle} <FairCycle: 0/0 []>
  • queue_order_strategy :获取queue的策略;
  • state :BrokerState状态;
  • subclient :PubSub所用的client;
    keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX) :bing用到的key;

比如_get_client可以看出来client。

def _get_client(self):    if redis.VERSION < (3, 2, 0):        raise VersionMismatch(            'Redis transport requires redis-py versions 3.2.0 or later. '            'You have {0.__version__}'.format(redis))    return redis.StrictRedis

简化版定义如下:

class Channel(virtual.Channel):    """Redis Channel."""    QoS = QoS    _client = None    _subclient = None    keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX)    keyprefix_fanout = '/{db}.'    sep = '\x06\x16'    _fanout_queues = {}    unacked_key = '{p}unacked'.format(p=KEY_PREFIX)    unacked_index_key = '{p}unacked_index'.format(p=KEY_PREFIX)    unacked_mutex_key = '{p}unacked_mutex'.format(p=KEY_PREFIX)    unacked_mutex_expire = 300  # 5 minutes    unacked_restore_limit = None    visibility_timeout = 3600   # 1 hour    max_connections = 10    queue_order_strategy = 'round_robin'    _async_pool = None    _pool = None    from_transport_options = (        virtual.Channel.from_transport_options +        ('sep',         'ack_emulation',         'unacked_key',		 ......         'max_connections',         'health_check_interval',         'retry_on_timeout',         'priority_steps')  # <-- do not add comma here!    )    connection_class = redis.Connection if redis else None

3.2.2 基类

基类定义如下:

class Channel(AbstractChannel, base.StdChannel):    """Virtual channel.    Arguments:        connection (ConnectionT): The transport instance this            channel is part of.    """    #: message class used.    Message = Message    #: QoS class used.    QoS = QoS    #: flag to restore unacked messages when channel    #: goes out of scope.    do_restore = True    #: mapping of exchange types and corresponding classes.    exchange_types = dict(STANDARD_EXCHANGE_TYPES)    #: flag set if the channel supports fanout exchanges.    supports_fanout = False    #: Binary <-> ASCII codecs.    codecs = {'base64': Base64()}    #: Default body encoding.    #: NOTE: ``transport_options['body_encoding']`` will override this value.    body_encoding = 'base64'    #: counter used to generate delivery tags for this channel.    _delivery_tags = count(1)    #: Optional queue where messages with no route is delivered.    #: Set by ``transport_options['deadletter_queue']``.    deadletter_queue = None    # List of options to transfer from :attr:`transport_options`.    from_transport_options = ('body_encoding', 'deadletter_queue')    # Priority defaults    default_priority = 0    min_priority = 0    max_priority = 9

最终具体举例如下:

self = {Channel} 
Client = {type}
Message = {type}
QoS = {type}
active_fanout_queues = {set: 0} set() active_queues = {set: 0} set() async_pool = {ConnectionPool} ConnectionPool
> auto_delete_queues = {set: 0} set() channel_id = {int} 1 client = {Redis} Redis
>> codecs = {dict: 1} {'base64':
} connection = {Transport}
connection_class = {type}
cycle = {FairCycle}
deadletter_queue = {NoneType} None exchange_types = {dict: 3} {'direct':
, 'topic':
, handlers = {dict: 2} {'BRPOP':
>, 'LISTEN':
>} pool = {ConnectionPool} ConnectionPool
> qos = {QoS}
queue_order_strategy = {str} 'round_robin' state = {BrokerState}
subclient = {PubSub}

3.2.3 redis消息回调函数

关于上面成员变量,这里需要说明的是

handlers = {dict: 2}   {    'BRPOP': 
>, 'LISTEN':
> }

这是redis有消息时的回调函数,即:

  • BPROP 有消息时候,调用 Channel._brpop_read;
  • LISTEN 有消息时候,调用 Channel._receive;

3.2.4 Redis 直接相关的主要成员

与Redis 直接相关的成员定义在:redis/client.py。

与 Redis 直接相关的主要成员是如下,会利用如下变量进行具体 redis操作:

  • async_pool :redis异步连接池;
  • pool :redis连接池;
  • client :就是StrictRedis之类的driver;
  • subclient :PubSub所用的client;

分别对应如下类型:

channel = {Channel} 
Client = {type}
async_pool = {ConnectionPool} ConnectionPool
> client = {Redis} Redis
>> connection = {Transport}
connection_class = {type}
connection_class_ssl = {type}
pool = {ConnectionPool} ConnectionPool
> subclient = {PubSub}

具体代码如下:

def _create_client(self, asynchronous=False):    if asynchronous:        return self.Client(connection_pool=self.async_pool)    return self.Client(connection_pool=self.pool)def _get_pool(self, asynchronous=False):    params = self._connparams(asynchronous=asynchronous)    self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db'])    return redis.ConnectionPool(**params)def _get_client(self):    if redis.VERSION < (3, 2, 0):        raise VersionMismatch(            'Redis transport requires redis-py versions 3.2.0 or later. '            'You have {0.__version__}'.format(redis))    return redis.StrictRedis@propertydef pool(self):    if self._pool is None:        self._pool = self._get_pool()    return self._pool@propertydef async_pool(self):    if self._async_pool is None:        self._async_pool = self._get_pool(asynchronous=True)    return self._async_pool@cached_propertydef client(self):    """Client used to publish messages, BRPOP etc."""    return self._create_client(asynchronous=True)@cached_propertydef subclient(self):    """Pub/Sub connection used to consume fanout queues."""    client = self._create_client(asynchronous=True)    return client.pubsub()

因为添加了Channel,所以此时如下:

+-----------------+| Channel         ||                 |      +-----------------------------------------------------------+|    client  +---------> | Redis
|| | +-----------------------------------------------------------+| || | +---------------------------------------------------+-+| pool +----------> |ConnectionPool
|| | +---------------------------------------------------+-+| || || || connection || |+-----------------++-------------------+ +---------------------+ +--------------------+| Connection | | redis.Transport | | MultiChannelPoller || | | | | || | | | | _channels || | | cycle +------------> | _fd_to_chan || transport +---------> | | | _chan_to_sock || | | | | poller |+-------------------+ +---------------------+ | after_read | | |+------------------+ +--------------------+| Hub || || | +-------------+| poller +---------------> | _poll || | | | +-------+| | | _poller+---------> | poll |+------------------+ | | +-------+ +-------------++----------------+ +-------------------+| Exchange | | Queue || | | || | | || channel | <------------+ exchange || | | || | | |+----------------+ +-------------------+

3.3 channel 与 Connection 联系

讲到这里,基本道理大家都懂,但是具体两者之间如何联系,我们需要再剖析下。

3.3.1 从Connection得到channel

在Connection定义中有如下,原来 Connection 是通过 transport 来得到 channel:

def channel(self):    """Create and return a new channel."""    self._debug('create channel')    chan = self.transport.create_channel(self.connection)    return chan

3.3.2 Transport具体创建

在Transport之中有:

def create_channel(self, connection):    try:        return self._avail_channels.pop()    except IndexError:        channel = self.Channel(connection)        self.channels.append(channel)        return channel

原来在 Transport 有两个channels 列表:

self._avail_channelsself.channels

如果_avail_channels 有内容则直接获取,否则生成一个新的Channel。

在真正连接时候,会调用 establish_connection 放入self._avail_channels。

def establish_connection(self):    # creates channel to verify connection.    # this channel is then used as the next requested channel.    # (returned by ``create_channel``).    self._avail_channels.append(self.create_channel(self))    return self     # for drain events

其堆栈如下:

__init__, redis.py:557create_channel, base.py:921establish_connection, base.py:939_establish_connection, connection.py:801_connection_factory, connection.py:866retry_over_time, functional.py:313_ensure_connection, connection.py:439connection, connection.py:859channel, connection.py:283
, node.py:11

3.3.3 建立联系

在init中有:

def __init__(self, *args, **kwargs):    super().__init__(*args, **kwargs)    if not self.ack_emulation:  # disable visibility timeout        self.QoS = virtual.QoS    self._queue_cycle = cycle_by_name(self.queue_order_strategy)()    self.Client = self._get_client()    self.ResponseError = self._get_response_error()    self.active_fanout_queues = set()    self.auto_delete_queues = set()    self._fanout_to_queue = {}    self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}     ......    self.connection.cycle.add(self)  # add to channel poller.    if register_after_fork is not None:        register_after_fork(self, _after_fork_cleanup_channel)

重点是:

self.connection.cycle.add(self)  # add to channel poller.

这就是把 Channel与Transport 中的 poller 联系起来,这样Transport可以利用Channel去与真实的redis进行交互

堆栈如下:

add, redis.py:277__init__, redis.py:531create_channel, base.py:920establish_connection, base.py:938_establish_connection, connection.py:801_connection_factory, connection.py:866retry_over_time, functional.py:325_ensure_connection, connection.py:439connection, connection.py:859register_with_event_loop, connection.py:266main, testUb.py:41

因为已经联系起来,所以此时如下:

+-----------------+| Channel         ||                 |      +-----------------------------------------------------------+|    client  +---------> | Redis
|| | +-----------------------------------------------------------+| || | +---------------------------------------------------+-+| pool +----------> |ConnectionPool
|| | +---------------------------------------------------+-+| || | <------------------------------------------------------------+| | || connection +---------------+ || | | |+-----------------+ | | v |+-------------------+ +---+-----------------+ +--------------------+ || Connection | | redis.Transport | | MultiChannelPoller | || | | | | | || | | | | _channels +--------+| | | cycle +------------> | _fd_to_chan || transport +---------> | | | _chan_to_sock || | | | | poller |+-------------------+ +---------------------+ | after_read | | |+------------------+ +--------------------+| Hub || || | +-------------+| poller +---------------> | _poll || | | | +-------+| | | _poller+---------> | poll |+------------------+ | | +-------+ +-------------++----------------+ +-------------------+| Exchange | | Queue || | | || | | || channel | <------------+ exchange || | | || | | |+----------------+ +-------------------+

3.3 Transport 与 Hub 联系

on_poll_init 这里就是把 kombu.transport.redis.Transport 与 Hub 联系起来。

self.poller = poller把Transport与Hub的poll联系起来。这样 Transport 就可以利用 poll。

def on_poll_init(self, poller):    self.poller = poller    for channel in self._channels:        return channel.qos.restore_visible(            num=channel.unacked_restore_limit,        )

此时变量如下:

poller = {_poll} 
self = {MultiChannelPoller}
after_read = {set: 0} set() eventflags = {int} 25 fds = {dict: 0} {} poller = {_poll}

因此,我们最终如下:

+-----------------+| Channel         ||                 |      +-----------------------------------------------------------+|    client  +---------> | Redis
|| | +-----------------------------------------------------------+| || | +---------------------------------------------------+-+| pool +----------> |ConnectionPool
|| | +---------------------------------------------------+-+| || | <------------------------------------------------------------+| | || connection +---------------+ || | | |+-----------------+ | | v |+-------------------+ +---+-----------------+ +--------------------+ || Connection | | redis.Transport | | MultiChannelPoller | || | | | | | || | | | | _channels +--------+| | | cycle +------------> | _fd_to_chan || transport +---------> | | | _chan_to_sock || | | | +<----+ poller |+-------------------+ +---------------------+ | | after_read | | | |+------------------+ +--------------+ +--------------------+| Hub | || | v| | +-------+-----+| poller +---------------> | _poll || | | | +-------+| | | _poller+---------> | poll |+------------------+ | | +-------+ +-------------++----------------+ +-------------------+| Exchange | | Queue || | | || | | || channel | <------------+ exchange || | | || | | |+----------------+ +-------------------+

0x04 总结

具体如图,可以看出来,上面三个基本模块已经联系到了一起。

可以看到,

  • 目前是以Transport为中心,把 Channel代表的真实 redis 与 Hub其中的poll联系起来,但是具体如何使用则尚未得知。
  • 用户是通过Connection来作为API入口,connection可以得到Transport。

既然基本架构已经搭好,所以从下文开始,我们看看 Consumer 是如何运作的。

0xFF 参考

上一篇:[源码分析] 消息队列 Kombu 之 Consumer
下一篇:[源码解析] 消息队列 Kombu 之 基本架构

发表评论

最新留言

哈哈,博客排版真的漂亮呢~
[***.90.31.176]2025年05月12日 20时05分45秒