[源码解析] 并行分布式任务队列 Celery 之 负载均衡
发布日期:2021-05-14 19:08:19 浏览次数:20 分类:博客文章

本文共 23139 字,大约阅读时间需要 77 分钟。

[������������] ��������������������������� Celery ��� ������������

������

0x00 ������

Celery��������������������������������������������������������������������������������������������������������������������������������������������������������������������� Celery ������������������������

Autoscaler ��������� ���������������������������������������������������������������������������������������������������������

0x01 ������������

Celery ������������������������������������������������������������ Kombu ��������������������� broker ��� Redis ������������

  • ��� worker ������ ��� ��������� queue ��������������������������������������� queues ������
  • ��� worker ��������� broker ��������������� brpop ��������������������������������������������������������� worker ���������������������
  • ��� worker ������ broker ��������������������� ������ ������ task ���������worker ������������������������������������������������������������������ worker ���������������������������

��������������������������� worker ��������������������������������� ������������������������������������������������

������������������������������������������ which worker ----> which queue in the worker ----> which subprocess in the worker ���������������

������������������ "worker ������������������������������" ������������������������

1.1 ��������� queue

Kombu ������������������ redis ��� BRPOP ������������������������ queue ���������������������

  • Kombu ���������������������������������������������������������queues������������
  • queue ������������������������������������������ redis ������������ ������key������ queue ������������������ BRPOP ������������ ��������� key���
  • Kombu ������������������������������������������ queues ������ ������ redis ���������������������keys��������������������������� redis keys���
  • brpop���������key������������������������ key ��������������������� key ������������������������������������������������������������������������������������������������������������ ������queue ������������������

������ task ��������� ������������ queue���������������������������queue ���������������������������������������

1.1.1 _brpop_start ���������������������queue

Kombu ������������������������������ _brpop_start ������������������������������ ������������������������queues���

_brpop_start ���������

def _brpop_start(self, timeout=1):    # ������������������queues    queues = self._queue_cycle.consume(len(self.active_queues))    if not queues:        return    # ������queue���������keys      keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps            for queue in queues] + [timeout or 0]    self._in_poll = self.client.connection    self.client.connection.send_command('BRPOP', *keys) # ������������keys������redis������������key���������

���������������������

self.active_queues = {set: 1} {'celery'}len(self.active_queues) = {int} 1  self._queue_cycle = {round_robin_cycle} 
self = {Channel}

������_brpop_start ��������� self._queue_cycle ���������������������������queue���

������������������

+                                                  Kombu       |          Redis                                                              |                                                              |+--------------------------------------------+                ||  Worker                                    |                ||                                            |                |           queue 1 key|   +-----------+                            |                ||   | queue 1   |                            |   BRPOP(keys)  ||   | queue 2   |                    keys    |                ||   | ......    | +--------+---------------------------------------->     queue 2 key|   | queue n   |          ^                 |                ||   +-----------+          | keys            |                ||                          |                 |                ||                          |                 |                |           queue 3 key|            +-------------+------------+    |                ||            |         Keys list        |    |                ||            |                          |    |                ||            +--------------------------+    |                |+--------------------------------------------+                |                                                              |                                                              |                                                              |                                                              |                                                              |                                                              +

1.1.2 round_robin_cycle ��������������������� queue

��������������������������������������� consume ������������ round_robin_cycle ������������ queue������ return self.items[:n]���

��� self.items ��������������������� rotate ��������������������� ������������ ������ queue ������������������������������������ queue ��������������� round robin ���������������

class round_robin_cycle:    """Iterator that cycles between items in round-robin."""    def __init__(self, it=None):        self.items = it if it is not None else []    def update(self, it):        """Update items from iterable."""        self.items[:] = it    def consume(self, n):        """Consume n items."""        return self.items[:n]    def rotate(self, last_used):        """Move most recently used item to end of list."""        items = self.items        try:            items.append(items.pop(items.index(last_used)))        except ValueError:            pass        return last_used

������������������������������������������������������������������ self._queue_cycle.rotate(dest) ���������������

def _brpop_read(self, **options):        try:            try:                dest__item = self.client.parse_response(self.client.connection,                                                        'BRPOP',                                                        **options)            except self.connection_errors:                # if there's a ConnectionError, disconnect so the next                # iteration will reconnect automatically.                self.client.connection.disconnect()                raise            if dest__item:                dest, item = dest__item                dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]                self._queue_cycle.rotate(dest) # ������������������                self.connection._deliver(loads(bytes_to_str(item)), dest)                return True            else:                raise Empty()        finally:            self._in_poll = None

������������������

+                                                  Kombu       |          Redis                                                              |                                                              |+--------------------------------------------+                ||  Worker                                    |                ||                                            |                |           queue 1 key|   +-----------+                            |                ||   | queue 1   |                            |   BRPOP(keys)  ||   | queue 2   |                    keys    |                ||   | ......    | +--------+---------------------------------------->     queue 2 key|   | queue n   |          ^                 |                ||   +-----------+          | keys            |                ||                          |                 |                ||                          +                 |                |           queue 3 key|                 round_robin_cycle          |                ||                          +                 |                ||                          |                 |                ||                          |                 |                ||            +-------------+------------+    |                ||            |         Keys list        |    |                ||            +--------------------------+    |                |+--------------------------------------------+                |                                                              |                                                              +

1.2 ���������worker

������������ worker ��������������� brpop ������ broker ������������������������������������������������������������������������������ ���������������������redis ������������������������������������������ worker ���������������

��������������������������������������������� spring quartz ������������������������������������

  • spring quartz ��� ������������������ ������������������������������������������������������������������������������������������ ���������������
  • Kombu ��������� ������ worker ������ redis "���������������������key" ��� ������������ ��������� "��������� worker ������������������������"���

������������������

+                                            Kombu           |    Redis                                                            |                                                            |+--------------------------------------+                    ||  Worker 1                            |                    ||                                      |                    ||   +-----------+                      |                    ||   | queue 1   |                      |   BRPOP(keys)      ||   | queue 2   |              keys    |                    ||   | ......    | +--------+-----------------------------+  ||   | queue n   |          ^           |                 |  ||   +-----------+          |  keys     |                 |  ||                          |           |                 |  ||                          +           |                 |  ||                  round_robin_cycle   |                 |  |               +--> queue 1 key|                          ^           |                 |  |               ||                          |           |                 |  |               ||                          |           |                 |  | Single Thread ||             +------------+---------+ |                 +---------------------> queue 2 key|             |     keys list        | |                 |  |               ||             +----------------------+ |                 |  |               |+--------------------------------------+                 |  |               |                                                         |  |               +--> queue 3 key                                                         |  |+--------------------------------------+                 |  || Worker 2                             |   BRPOP(keys)   |  ||                                      | +---------------+  ||                                      |                 |  |+--------------------------------------+                 |  |                                                         |  |+--------------------------------------+   BRPOP(keys)   |  || Worker 3                             |                 |  ||                                      |  +--------------+  ||                                      |                    +|                                      |+--------------------------------------+

1.3 ���������������

������������������������������������������������������������������������������������

1.3.1 ������

��������� strategy������ AsynPool ������������������������������������

class AsynPool(_pool.Pool):    """AsyncIO Pool (no threads)."""    def __init__(self, processes=None, synack=False,                 sched_strategy=None, proc_alive_timeout=None,                 *args, **kwargs):        self.sched_strategy = SCHED_STRATEGIES.get(sched_strategy,                                                   sched_strategy)

������������������ strategy ������������������������������������������������������������

SCHED_STRATEGY_FCFS = 1 # ���������������SCHED_STRATEGY_FAIR = 4 # ������SCHED_STRATEGIES = {    None: SCHED_STRATEGY_FAIR,    'default': SCHED_STRATEGY_FAIR,    'fast': SCHED_STRATEGY_FCFS,    'fcfs': SCHED_STRATEGY_FCFS,    'fair': SCHED_STRATEGY_FAIR,}

1.3.2 ������������

������������������������������������

������������������������������������������������������������������������������������

  • Linux ������������������������������������������������������������������CPU������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������, ������������������������������������
  • Hadoop ������������������������������������������job������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������task���������������������slot���������������������������������������������������������������������������������CPU���������
  • Yarn ���������Fair Share������������Yarn���������������������������������������������������������������������������������������������������������������������������������

1.3.3 ������������ in Celery

��� asynpool���������������������������"��������� fair ������":

is_fair_strategy = self.sched_strategy == SCHED_STRATEGY_FAIR

������ is_fair_strategy ���������������Celery ���������������������������������

��������� poll ������������������ fair������������ ������ idle worker ��������������������������� idler worker ���������������������

def on_poll_start():    # Determine which io descriptors are not busy    inactive = diff(active_writes)    # Determine hub_add vs hub_remove strategy conditional    if is_fair_strategy:        # outbound buffer present and idle workers exist        add_cond = outbound and len(busy_workers) < len(all_inqueues)    else:  # default is add when data exists in outbound buffer        add_cond = outbound    if add_cond:  # calling hub_add vs hub_remove        iterate_file_descriptors_safely(            inactive, all_inqueues, hub_add,            None, WRITE | ERR, consolidate=True)    else:        iterate_file_descriptors_safely(            inactive, all_inqueues, hub_remove)

��������������� ��������� ��������������������������� worker ��������������������������������� task������������������������������������������������������������ ������worker ������������������������

def schedule_writes(ready_fds, total_write_count=None):            if not total_write_count:                total_write_count = [0]            # Schedule write operation to ready file descriptor.            # The file descriptor is writable, but that does not            # mean the process is currently reading from the socket.            # The socket is buffered so writable simply means that            # the buffer can accept at least 1 byte of data.            # This means we have to cycle between the ready fds.            # the first version used shuffle, but this version            # using `total_writes % ready_fds` is about 30% faster            # with many processes, and also leans more towards fairness            # in write stats when used with many processes            # [XXX On macOS, this may vary depending            # on event loop implementation (i.e, select/poll vs epoll), so            # have to test further]            num_ready = len(ready_fds)            for _ in range(num_ready):                ready_fd = ready_fds[total_write_count[0] % num_ready]                total_write_count[0] += 1                if ready_fd in active_writes:                    # already writing to this fd                    continue                 if is_fair_strategy and ready_fd in busy_workers: # ������������                    # worker is already busy with another task                    continue                if ready_fd not in all_inqueues:                    hub_remove(ready_fd)                    continue

���������������������

+                                            Kombu         |    Redis                                                          |                                         BRPOP(keys)      |+------------------------------------+                    ||  Worker 1                          | +---------------+  ||                                    |                 |  |+------------------------------------+                 |  |                     queue 1 key                                                       |  |                 +->                                                       |  |                 |+------------------------------------+     BRPOP(keys) |  |  Single thread  || Worker 2                           | +--------------------------------------> queue 2 key|                                    |                 |  |  (which worker) |+------------------------------------+                 |  |                 |                                                       |  |                 |+------------------------------------+                 |  |                 +-> queue 3 key| Worker 3                           |                 |  ||                                    |                 |  ||     +-----------+                  |                 |  ||     | queue 1   |                  |     BRPOP(keys) |  ||     | queue 2   |          keys    |                 |  ||     | ......    | +--------+-------------------------+  ||     | queue n   |          ^       |                    ||     +-----------+          | keys  |                    ||                            |       |                    ||                            +       |                    ||                 round_robin_cycle (which queues)        ||                            ^       |                    ||                            |       |                    ||                            |       |                    ||                       +----+----+  |                    ||             +         |keys list|  |                    ||             |         +---------+  |                    |+------------------------------------+                    |              |                                           |              |  fair_strategy(which subprocess)          |              |                                           |      +-------+----------+----------------+               |      |                  |                |               |      v                  v                v               |+-----+--------+  +------+-------+  +-----+--------+      || subprocess 1 |  | subprocess 2 |  | subprocess 3 |      ++--------------+  +--------------+  +--------------+

0x02 Autoscaler

Autoscaler ��������� ���������������������������������������������������������������������������������������������������������

2.1 ������������

��� WorkerComponent ��������������������� AutoScaler ������������������������������

  • ��������� consumer ���������������������������������������������������������������������������
  • ������ Hub ��� call_repeatedly ������������������������������������������������������������������

������������������������������������������������

class WorkerComponent(bootsteps.StartStopStep):    """Bootstep that starts the autoscaler thread/timer in the worker."""    def create(self, w):        scaler = w.autoscaler = self.instantiate(            w.autoscaler_cls,            w.pool, w.max_concurrency, w.min_concurrency,            worker=w, mutex=DummyLock() if w.use_eventloop else None,        )        return scaler if not w.use_eventloop else None    def register_with_event_loop(self, w, hub):        w.consumer.on_task_message.add(w.autoscaler.maybe_scale) # ������������������������������������������                hub.call_repeatedly( # ������������������������������            w.autoscaler.keepalive, w.autoscaler.maybe_scale,        )

2.2 ������������

2.2.1 bgThread

Autoscaler ���Background thread��������� AutoScaler���������������������������

class bgThread(threading.Thread):    """Background service thread."""    def run(self):        body = self.body        shutdown_set = self._is_shutdown.is_set        try:            while not shutdown_set():            	body()        finally:            self._set_stopped()

2.2.2 ������

Autoscaler ������������������������������������������������������������������������������

  • ���������������������������������������������������
  • ���������������������������������������
  • ���������������������������������������������������������������������������������������������������������������������������������������������
class Autoscaler(bgThread):    """Background thread to autoscale pool workers."""    def __init__(self, pool, max_concurrency,                 min_concurrency=0, worker=None,                 keepalive=AUTOSCALE_KEEPALIVE, mutex=None):        super().__init__()        self.pool = pool        self.mutex = mutex or threading.Lock()        self.max_concurrency = max_concurrency        self.min_concurrency = min_concurrency        self.keepalive = keepalive        self._last_scale_up = None        self.worker = worker    def body(self):        with self.mutex:            self.maybe_scale()        sleep(1.0)    def _maybe_scale(self, req=None):        procs = self.processes        cur = min(self.qty, self.max_concurrency)        if cur > procs:            self.scale_up(cur - procs)            return True        cur = max(self.qty, self.min_concurrency)        if cur < procs:            self.scale_down(procs - cur)            return True    def maybe_scale(self, req=None):        if self._maybe_scale(req):            self.pool.maintain_pool()    def update(self, max=None, min=None):        with self.mutex:            if max is not None:                if max < self.processes:                    self._shrink(self.processes - max)                self._update_consumer_prefetch_count(max)                self.max_concurrency = max            if min is not None:                if min > self.processes:                    self._grow(min - self.processes)                self.min_concurrency = min            return self.max_concurrency, self.min_concurrency    def scale_up(self, n):        self._last_scale_up = monotonic()        return self._grow(n)    def scale_down(self, n):        if self._last_scale_up and (                monotonic() - self._last_scale_up > self.keepalive):            return self._shrink(n)    def _grow(self, n):        self.pool.grow(n)    def _shrink(self, n):		self.pool.shrink(n)    def _update_consumer_prefetch_count(self, new_max):        diff = new_max - self.max_concurrency        if diff:            self.worker.consumer._update_prefetch_count(                diff            )    @property    def qty(self):        return len(state.reserved_requests)    @property    def processes(self):        return self.pool.num_processes

0xEE ������������

������������������������������������������������������������������

������������������������������������

������������������������������������������������������������������������������������������������������������������������

0xFF ������

上一篇:performSelector系列方法的研究
下一篇:push notification通知分组

发表评论

最新留言

不错!
[***.144.177.141]2025年05月04日 10时51分30秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章