[源码分析] 并行分布式任务队列 Celery 之 Timer & Heartbeat
发布日期:2021-05-18 10:02:45 浏览次数:19 分类:精选文章

本文共 5902 字,大约阅读时间需要 19 分钟。

Celery 是一个灵活且可靠的分布式任务队列系统,专注于实时处理异步任务,同时支持任务调度。以下将详细分析 Celery 中 Timer 和 Heart 组件的实现原理及其在系统中的作用。

Blueprint

Celery worker 的初始化过程由 Blueprint 类管理,内部各子模块按特定依赖关系执行。Worker 的 Blueprint 包括 Hub、Pool、Beat、Timer 等关键组件。其中,Timer 和 Heart 是 Worker 的两大核心组件,特别是在后续分析中占据重要位置。

Timer Step

Timer 组件根据当前 Worker 是否使用事件循环机制,决定创建哪种类型的定时器。具体来说:

- 如果使用 eventloop(如 gevent 或 eventlet),则采用 kombu.asynchronous.timer.Timer,其定时等待由事件循环处理。- 否则,使用 Pool 内部自定义的 Timer 类(即 timer2.Timer),由 Timer 自己启动线程进行定时等待。

在代码中,可以看到 Timer 组件检查 Worker 是否使用事件循环:

```pythonfrom kombu.asynchronous.timer import Timer as _Timer

class Timer(bootsteps.Step):def create(self, w):if w.use_eventloop:w.timer = _Timer(max_interval=10.0)else:if not w.timer_cls:w.timer_cls = w.pool_cls.Timerw.timer = self.instantiate(w.timer_cls, max_interval=w.timer_precision,on_error=self.on_timer_error,on_tick=self.on_timer_tick)

其中,timer2 的引入是为了适应不支持事件循环的 Transport(如 MongoDB)инов的情况。

Transport

Celery 依赖 Kombu 对各种 Message Queue 不同实现(如 Redis、RabbitMQ、MongoDB 等)进行抽象处理。Transport 是 Kombu 中对具体 Message Queue 连接的抽象,其职责是实现消息的生产和消费。

- Redis 和 RabbitMQ 等支持 AMQP 的 Broker 通常采用 thread-less 和 lock-free 的实现,减少了资源消耗。- 其他 Transport(如 MongoDB)则采用 thread-based 实现。

Kombu 中的 Transport 负责具体的消息操作,而不是直接使用 Connection。在 Redis Transport 中,Transport 会通过 event-loop 来管理定时任务,避免了直接使用线程或其他 blocking 操作。

Thread-less vs Thread-based

不同 Transport 的实现方式影响了 Timer 的选择: - Thread-less Transport(如 Redis)使用 Kombu 的 Timer 类,该类基于事件循环机制,不需要额外的线程处理定时任务。- Thread-based Transport(如 MongoDB)则使用 timer2.Timer,该类通过线程独立地执行定时任务。

Kombu 的设计注重性能和资源优化,选择适合特定 Transport 的定时任务处理方式。在 Redis 中,Timer 基于 event-loop,而在 MongoDB 中,Timer 则独立于 event-loop。

Timer in Pool

Timer 可以在不同的 Pool 实现中使用,具体取决于 Worker 组件的配置。 - gevent 和 eventlet Pool 则使用 Kombu 的 Timer 组件。- BasePool 及其他类型 Pool 则使用自定义的 timer2.Timer。

gevent 和 eventlet 这类库基于绿色协程模型,它们使用 Kombu 的 Timer 类,通过事件循环机制处理定时任务。而 BasePool 则依赖 timer2,通过线程独立地执行定时任务。

kombu.Timer

kombu.asynchronous.timer.Timer 是一种异步定的实现。它通过返回 (wait_seconds, entry) 对来驱动调用者进行等待操作,调用者必须在 next_entry().

```pythondef call_repeatedly(self, secs, fun, args=(), kwargs=None, priority=0): tref = self.Entry(fun, args, kwargs) @wraps(fun) def _reschedules(*args, **kwargs): last, now = tref._last_run, monotonic() lsince = (now - tref._last_run) if last else secs try: if lsince and lsince >= secs: tref._last_run = now return fun(*args, **kwargs) finally: if not tref.canceled: last = tref._last_run next = secs - (now - last) if last else secs self.enter_after(next, tref, priority) tref.fun = _reschedules tref._last_run = now return self.enter_after(secs, tref, priority)

调用 Timer 的方法是通过 call_repeatedly 扩展用户函数到 Timer 实例中,并在 enter_after 方法中安排定时任务执行。

实验

通过示例代码可以看出,Timer 和 Hub 调用方式的核心逻辑。以下是一个完整的示例代码:

```pythondef main(): hub = Hub() exchange = Exchange('asynt') queue = Queue('asynt', exchange, 'asynt')
def send_message(conn):    producer = Producer(conn)    producer.publish('hello world', exchange=exchange, routing_key='asynt')    print('message sent')def on_message(message):    print('received: {0!r}'.format(message.body))    message.ack()    # Uncomment next line to stop after one message    # hub.stop()conn = Connection('redis://localhost:6379')conn.register_with_event_loop(hub)with Consumer(conn, [queue], on_message=on_message, auto_ack=False) as consumer:    send_message(conn)    hub.timer.call_repeatedly(3, p_message)    hub.run_forever()return main if __name__ == '__main__' else None

该代码中,Hub 使用 Timer 组件进行消息循环。当 Timer 调用用户定义的函数(如 p_message)时,函数会被执行,并且会重复执行每隔 3 秒。

timer2

timer2 是 Celery 中专门为不支持 AsyncIO 的 Transport(如 MongoDB)设计的 Timer 实现。它通过继承 threading.Thread 实现定时等待。其代码如下:

```pythonfrom kombu.asynchronous.timer import Entryfrom threading import Threadimport timeclass Timer(Thread): Note: This is only used for transports not supporting AsyncIO. Entry = Entry Schedule = Scheduler running = False on_tick = None timer_count = count(1) def run(self): self.running = True self.scheduler = iter(self.schedule) while not self._is_shutdown.isSet(): delay = self._next_entry() if delay: if self.on_tick: self.on_tick(delay) if sleep is None: break sleep(delay) try: self._is_stopped.set() except TypeError: # The interpreter was shut down, modules may be in process of garbage collection pass except Exception as exc: sys.stderr.flush() os._exit(1)

Heart

Heart 组件用于向其他 Worker 发送心跳消息,确认 Worker 还在线。其工作流程如下:

1. 当 Worker 启动时,Heart 组件被激活,发送 worker-online 消息。2. 定期调用 Heartbeat 调用用户函数(如 _send)。

Heart 组件的实现涉及 EventDispatcher,用于群发心跳消息。代码如下:

```pythonclass Heart: def __init__(self, timer, eventer, interval=2.0): self.timer = timer self.eventer = eventer self.interval = interval self.tref = None self._send_sent_signal = None
def _send(self, event, retry=True):    if self._send_sent_signal is not None:        self._send_sent_signal(sender=self)    return self.eventer.send(        event, freq=self.interval,        active=len(active_requests),        processed=all_total_count[0],        loadavg=load_average(),        retry=retry,        **SOFTWARE_INFO    )def start(self):    if self.eventer.enabled:        self.tref = self.timer.call_repeatedly(            self.interval, self._send,            ('worker-heartbeat',)        )

在 Worker 停止时,Heart 组件会发送 worker-offline 消息:

```pythondef stop(self): if self.tref is not None: self.timer.cancel(self.tref) self.tref = None if self.eventer.enabled: self._send('worker-offline', retry=False)

通过 Heart 组件,Celery 实现了 Worker 的在线状态跟踪和心跳机制。

总之,Celery 的 Timer 和 Heart 组件为分布式任务队列系统提供了关键的定时调度功能,确保了系统的高效运行和可靠性。

上一篇:记一次CDH6.3.2集群日志数据清理指南
下一篇:华为RH2288H服务器引导ServiceCD安装Windows Server操作系统

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2025年04月19日 16时44分09秒