
本文共 23139 字,大约阅读时间需要 77 分钟。
[������������] ��������������������������� Celery ��� ������������
0x00 ������
Celery��������������������������������������������������������������������������������������������������������������������������������������������������������������������� Celery ������������������������
Autoscaler ��������� ���������������������������������������������������������������������������������������������������������
0x01 ������������
Celery ������������������������������������������������������������ Kombu ��������������������� broker ��� Redis ������������
- ��� worker ������ ��� ��������� queue ��������������������������������������� queues ������
- ��� worker ��������� broker ��������������� brpop ��������������������������������������������������������� worker ���������������������
- ��� worker ������ broker ��������������������� ������ ������ task ���������worker ������������������������������������������������������������������ worker ���������������������������
��������������������������� worker ��������������������������������� ������������������������������������������������
������������������������������������������ which worker ----> which queue in the worker ----> which subprocess in the worker
���������������
������������������ "worker ������������������������������" ������������������������
1.1 ��������� queue
Kombu ������������������ redis ��� BRPOP ������������������������ queue ���������������������
- Kombu ���������������������������������������������������������queues������������
- queue ������������������������������������������ redis ������������ ������key������ queue ������������������ BRPOP ������������ ��������� key���
- Kombu ������������������������������������������ queues ������ ������ redis ���������������������keys��������������������������� redis keys���
- brpop���������key������������������������ key ��������������������� key ������������������������������������������������������������������������������������������������������������ ������queue ������������������
������ task ��������� ������������ queue���������������������������queue ���������������������������������������
1.1.1 _brpop_start ���������������������queue
Kombu ������������������������������ _brpop_start ������������������������������ ������������������������queues���
_brpop_start ���������
def _brpop_start(self, timeout=1): # ������������������queues queues = self._queue_cycle.consume(len(self.active_queues)) if not queues: return # ������queue���������keys keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps for queue in queues] + [timeout or 0] self._in_poll = self.client.connection self.client.connection.send_command('BRPOP', *keys) # ������������keys������redis������������key���������
���������������������
self.active_queues = {set: 1} {'celery'}len(self.active_queues) = {int} 1 self._queue_cycle = {round_robin_cycle}self = {Channel}
������_brpop_start
��������� self._queue_cycle ���������������������������queue���
������������������
+ Kombu | Redis | |+--------------------------------------------+ || Worker | || | | queue 1 key| +-----------+ | || | queue 1 | | BRPOP(keys) || | queue 2 | keys | || | ...... | +--------+----------------------------------------> queue 2 key| | queue n | ^ | || +-----------+ | keys | || | | || | | | queue 3 key| +-------------+------------+ | || | Keys list | | || | | | || +--------------------------+ | |+--------------------------------------------+ | | | | | | +
1.1.2 round_robin_cycle ��������������������� queue
��������������������������������������� consume ������������ round_robin_cycle ������������ queue������ return self.items[:n]���
��� self.items ��������������������� rotate ��������������������� ������������ ������ queue ������������������������������������ queue ��������������� round robin ���������������
class round_robin_cycle: """Iterator that cycles between items in round-robin.""" def __init__(self, it=None): self.items = it if it is not None else [] def update(self, it): """Update items from iterable.""" self.items[:] = it def consume(self, n): """Consume n items.""" return self.items[:n] def rotate(self, last_used): """Move most recently used item to end of list.""" items = self.items try: items.append(items.pop(items.index(last_used))) except ValueError: pass return last_used
������������������������������������������������������������������ self._queue_cycle.rotate(dest)
���������������
def _brpop_read(self, **options): try: try: dest__item = self.client.parse_response(self.client.connection, 'BRPOP', **options) except self.connection_errors: # if there's a ConnectionError, disconnect so the next # iteration will reconnect automatically. self.client.connection.disconnect() raise if dest__item: dest, item = dest__item dest = bytes_to_str(dest).rsplit(self.sep, 1)[0] self._queue_cycle.rotate(dest) # ������������������ self.connection._deliver(loads(bytes_to_str(item)), dest) return True else: raise Empty() finally: self._in_poll = None
������������������
+ Kombu | Redis | |+--------------------------------------------+ || Worker | || | | queue 1 key| +-----------+ | || | queue 1 | | BRPOP(keys) || | queue 2 | keys | || | ...... | +--------+----------------------------------------> queue 2 key| | queue n | ^ | || +-----------+ | keys | || | | || + | | queue 3 key| round_robin_cycle | || + | || | | || | | || +-------------+------------+ | || | Keys list | | || +--------------------------+ | |+--------------------------------------------+ | | +
1.2 ���������worker
������������ worker ��������������� brpop ������ broker ������������������������������������������������������������������������������ ���������������������redis ������������������������������������������ worker ���������������
��������������������������������������������� spring quartz ������������������������������������
- spring quartz ��� ������������������ ������������������������������������������������������������������������������������������ ���������������
- Kombu ��������� ������ worker ������ redis "���������������������key" ��� ������������ ��������� "��������� worker ������������������������"���
������������������
+ Kombu | Redis | |+--------------------------------------+ || Worker 1 | || | || +-----------+ | || | queue 1 | | BRPOP(keys) || | queue 2 | keys | || | ...... | +--------+-----------------------------+ || | queue n | ^ | | || +-----------+ | keys | | || | | | || + | | || round_robin_cycle | | | +--> queue 1 key| ^ | | | || | | | | || | | | | Single Thread || +------------+---------+ | +---------------------> queue 2 key| | keys list | | | | || +----------------------+ | | | |+--------------------------------------+ | | | | | +--> queue 3 key | |+--------------------------------------+ | || Worker 2 | BRPOP(keys) | || | +---------------+ || | | |+--------------------------------------+ | | | |+--------------------------------------+ BRPOP(keys) | || Worker 3 | | || | +--------------+ || | +| |+--------------------------------------+
1.3 ���������������
������������������������������������������������������������������������������������
1.3.1 ������
��������� strategy������ AsynPool ������������������������������������
class AsynPool(_pool.Pool): """AsyncIO Pool (no threads).""" def __init__(self, processes=None, synack=False, sched_strategy=None, proc_alive_timeout=None, *args, **kwargs): self.sched_strategy = SCHED_STRATEGIES.get(sched_strategy, sched_strategy)
������������������ strategy ������������������������������������������������������������
SCHED_STRATEGY_FCFS = 1 # ���������������SCHED_STRATEGY_FAIR = 4 # ������SCHED_STRATEGIES = { None: SCHED_STRATEGY_FAIR, 'default': SCHED_STRATEGY_FAIR, 'fast': SCHED_STRATEGY_FCFS, 'fcfs': SCHED_STRATEGY_FCFS, 'fair': SCHED_STRATEGY_FAIR,}
1.3.2 ������������
������������������������������������
������������������������������������������������������������������������������������
- Linux ������������������������������������������������������������������CPU������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������, ������������������������������������
- Hadoop ������������������������������������������job������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������task���������������������slot���������������������������������������������������������������������������������CPU���������
- Yarn ���������Fair Share������������Yarn���������������������������������������������������������������������������������������������������������������������������������
1.3.3 ������������ in Celery
��� asynpool���������������������������"��������� fair ������":
is_fair_strategy = self.sched_strategy == SCHED_STRATEGY_FAIR
������ is_fair_strategy ���������������Celery ���������������������������������
��������� poll ������������������ fair������������ ������ idle worker ��������������������������� idler worker ���������������������
def on_poll_start(): # Determine which io descriptors are not busy inactive = diff(active_writes) # Determine hub_add vs hub_remove strategy conditional if is_fair_strategy: # outbound buffer present and idle workers exist add_cond = outbound and len(busy_workers) < len(all_inqueues) else: # default is add when data exists in outbound buffer add_cond = outbound if add_cond: # calling hub_add vs hub_remove iterate_file_descriptors_safely( inactive, all_inqueues, hub_add, None, WRITE | ERR, consolidate=True) else: iterate_file_descriptors_safely( inactive, all_inqueues, hub_remove)
��������������� ��������� ��������������������������� worker ��������������������������������� task������������������������������������������������������������ ������worker ������������������������
def schedule_writes(ready_fds, total_write_count=None): if not total_write_count: total_write_count = [0] # Schedule write operation to ready file descriptor. # The file descriptor is writable, but that does not # mean the process is currently reading from the socket. # The socket is buffered so writable simply means that # the buffer can accept at least 1 byte of data. # This means we have to cycle between the ready fds. # the first version used shuffle, but this version # using `total_writes % ready_fds` is about 30% faster # with many processes, and also leans more towards fairness # in write stats when used with many processes # [XXX On macOS, this may vary depending # on event loop implementation (i.e, select/poll vs epoll), so # have to test further] num_ready = len(ready_fds) for _ in range(num_ready): ready_fd = ready_fds[total_write_count[0] % num_ready] total_write_count[0] += 1 if ready_fd in active_writes: # already writing to this fd continue if is_fair_strategy and ready_fd in busy_workers: # ������������ # worker is already busy with another task continue if ready_fd not in all_inqueues: hub_remove(ready_fd) continue
���������������������
+ Kombu | Redis | BRPOP(keys) |+------------------------------------+ || Worker 1 | +---------------+ || | | |+------------------------------------+ | | queue 1 key | | +-> | | |+------------------------------------+ BRPOP(keys) | | Single thread || Worker 2 | +--------------------------------------> queue 2 key| | | | (which worker) |+------------------------------------+ | | | | | |+------------------------------------+ | | +-> queue 3 key| Worker 3 | | || | | || +-----------+ | | || | queue 1 | | BRPOP(keys) | || | queue 2 | keys | | || | ...... | +--------+-------------------------+ || | queue n | ^ | || +-----------+ | keys | || | | || + | || round_robin_cycle (which queues) || ^ | || | | || | | || +----+----+ | || + |keys list| | || | +---------+ | |+------------------------------------+ | | | | fair_strategy(which subprocess) | | | +-------+----------+----------------+ | | | | | v v v |+-----+--------+ +------+-------+ +-----+--------+ || subprocess 1 | | subprocess 2 | | subprocess 3 | ++--------------+ +--------------+ +--------------+
0x02 Autoscaler
Autoscaler ��������� ���������������������������������������������������������������������������������������������������������
2.1 ������������
��� WorkerComponent ��������������������� AutoScaler ������������������������������
- ��������� consumer ���������������������������������������������������������������������������
- ������ Hub ��� call_repeatedly ������������������������������������������������������������������
������������������������������������������������
class WorkerComponent(bootsteps.StartStopStep): """Bootstep that starts the autoscaler thread/timer in the worker.""" def create(self, w): scaler = w.autoscaler = self.instantiate( w.autoscaler_cls, w.pool, w.max_concurrency, w.min_concurrency, worker=w, mutex=DummyLock() if w.use_eventloop else None, ) return scaler if not w.use_eventloop else None def register_with_event_loop(self, w, hub): w.consumer.on_task_message.add(w.autoscaler.maybe_scale) # ������������������������������������������ hub.call_repeatedly( # ������������������������������ w.autoscaler.keepalive, w.autoscaler.maybe_scale, )
2.2 ������������
2.2.1 bgThread
Autoscaler ���Background thread��������� AutoScaler���������������������������
class bgThread(threading.Thread): """Background service thread.""" def run(self): body = self.body shutdown_set = self._is_shutdown.is_set try: while not shutdown_set(): body() finally: self._set_stopped()
2.2.2 ������
Autoscaler ������������������������������������������������������������������������������
- ���������������������������������������������������
- ���������������������������������������
- ���������������������������������������������������������������������������������������������������������������������������������������������
class Autoscaler(bgThread): """Background thread to autoscale pool workers.""" def __init__(self, pool, max_concurrency, min_concurrency=0, worker=None, keepalive=AUTOSCALE_KEEPALIVE, mutex=None): super().__init__() self.pool = pool self.mutex = mutex or threading.Lock() self.max_concurrency = max_concurrency self.min_concurrency = min_concurrency self.keepalive = keepalive self._last_scale_up = None self.worker = worker def body(self): with self.mutex: self.maybe_scale() sleep(1.0) def _maybe_scale(self, req=None): procs = self.processes cur = min(self.qty, self.max_concurrency) if cur > procs: self.scale_up(cur - procs) return True cur = max(self.qty, self.min_concurrency) if cur < procs: self.scale_down(procs - cur) return True def maybe_scale(self, req=None): if self._maybe_scale(req): self.pool.maintain_pool() def update(self, max=None, min=None): with self.mutex: if max is not None: if max < self.processes: self._shrink(self.processes - max) self._update_consumer_prefetch_count(max) self.max_concurrency = max if min is not None: if min > self.processes: self._grow(min - self.processes) self.min_concurrency = min return self.max_concurrency, self.min_concurrency def scale_up(self, n): self._last_scale_up = monotonic() return self._grow(n) def scale_down(self, n): if self._last_scale_up and ( monotonic() - self._last_scale_up > self.keepalive): return self._shrink(n) def _grow(self, n): self.pool.grow(n) def _shrink(self, n): self.pool.shrink(n) def _update_consumer_prefetch_count(self, new_max): diff = new_max - self.max_concurrency if diff: self.worker.consumer._update_prefetch_count( diff ) @property def qty(self): return len(state.reserved_requests) @property def processes(self): return self.pool.num_processes
0xEE ������������
������������������������������������������������������������������
������������������������������������
������������������������������������������������������������������������������������������������������������������������
0xFF ������
发表评论
最新留言
关于作者
