[源码解析] 并行分布式任务队列 Celery 之 EventDispatcher & Event 组件
发布日期:2021-05-11 07:56:31 浏览次数:8 分类:博客文章

本文共 38190 字,大约阅读时间需要 127 分钟。

[������������] ��������������������������� Celery ��� EventDispatcher & Event ������

������

0x00 ������

Celery���������������������������������������������������������������������������������������������������������������������������������������������������������

������������ EventDispatcher ��� Event ������ ���������������

0x01 ������

EventDispatcher ��� Event ������������ Celery ���������������Event���������������

���������������������������EventDispatcher ���������������������������Event������������������������������������������������������

  • ������������ ���������������������������������EventDispatcher ������������ ������������������
  • ��������������������������������������������� broker ���������������������
  • ������ Celery ��������������� Kombu������ Kombu ������������������������������������������������������������������������������������������
  • Kombu ������������ Mailbox ������������������������������������ Mailbox ������������������������������������������������������������������������������������ ��� ���������

���������������������������������EventDispatcher ������������ kombu ��� producer, consumer ������ Mailbox���

��� Events ������������������Event���������������������������������������������

  • Events ������ Kombu ��������������������� ���������
  • ��������������������������������������� Celery ������������������������������������������ State ���������

������������������������������������������������

���������������������������������������������������������������������

0x02 ������

EventDispatcher ���������������celery\events\dispatcher.py���

������������������������������������������������������������������������������������������

  • connection (kombu.Connection) ������������������ Broker ������������������������
  • channel (kombu.Channel) : Channel ���������������������������Connection���������������������������������������������������
    • Connection ��� AMQP ��� ������������������
    • Channel ��� AMQP ��� MQ ���������������������
    • ��������� "������redis������������������" ���������Channel ��������������� redis ��������������������������������� Channel ������������ redis ������������������������������������������ redis ��������������������������������������� socket��������� socket ������������ file������������ file ������������ poll���
  • producer ��������������������������� kombu producer ���������
  • exchange ���������������������������������������������������Exchange���������Exchange���������������������������������������������������
  • hostname : ��������������������������� EventDispatcher ���������������������������������������
  • groups ���������������������
  • _outbound_buffer ������������������
  • clock ���Lamport ���������������������������������������������������������������������������������������

������������������������

class EventDispatcher:    """Dispatches event messages.    """    DISABLED_TRANSPORTS = {'sql'}    app = None    def __init__(self, connection=None, hostname=None, enabled=True,                 channel=None, buffer_while_offline=True, app=None,                 serializer=None, groups=None, delivery_mode=1,                 buffer_group=None, buffer_limit=24, on_send_buffered=None):        self.app = app_or_default(app or self.app)        self.connection = connection        self.channel = channel        self.hostname = hostname or anon_nodename()        self.buffer_while_offline = buffer_while_offline        self.buffer_group = buffer_group or frozenset()        self.buffer_limit = buffer_limit        self.on_send_buffered = on_send_buffered        self._group_buffer = defaultdict(list)        self.mutex = threading.Lock()        self.producer = None        self._outbound_buffer = deque()        self.serializer = serializer or self.app.conf.event_serializer        self.on_enabled = set()        self.on_disabled = set()        self.groups = set(groups or [])        self.tzoffset = [-time.timezone, -time.altzone]        self.clock = self.app.clock        self.delivery_mode = delivery_mode        if not connection and channel:            self.connection = channel.connection.client        self.enabled = enabled        conninfo = self.connection or self.app.connection_for_write()        self.exchange = get_exchange(conninfo,                                     name=self.app.conf.event_exchange)        if conninfo.transport.driver_type in self.DISABLED_TRANSPORTS:            self.enabled = False        if self.enabled:            self.enable()        self.headers = {'hostname': self.hostname}        self.pid = os.getpid()

������������������������������������������������������������������

self = {EventDispatcher} 
DISABLED_TRANSPORTS = {set: 1} {'sql'} app = {Celery}
buffer_group = {frozenset: 0} frozenset() buffer_limit = {int} 24 buffer_while_offline = {bool} True channel = {NoneType} None clock = {LamportClock} 0 connection = {Connection}
delivery_mode = {int} 1 enabled = {bool} True exchange = {Exchange} Exchange celeryev(fanout) groups = {set: 1} {'worker'} headers = {dict: 1} {'hostname': 'celery@DESKTOP-0GO3RPO'} hostname = {str} 'celery@DESKTOP-0GO3RPO' mutex = {lock}
on_disabled = {set: 1} {
>} on_enabled = {set: 1} {
>} on_send_buffered = {NoneType} None pid = {int} 26144 producer = {Producer}
> publisher = {Producer}
> serializer = {str} 'json' tzoffset = {list: 2} [28800, 32400] _group_buffer = {defaultdict: 0} defaultdict(
, {}) _outbound_buffer = {deque: 0} deque([])

0x03 Producer

���������������EventDispatcher ��������������� Kombu ��� Producer��������� Celery ������������ ampq ��� Kombu ��������������������������������������������������������� Producer���

���������������������������

  • Connection��������������������������������������� Redis���

  • Exchange������������������������������ Queue���

������������������������������

3.1 Connection

������������������������Connection ��������������� Celery ��� connection_for_write

conninfo = self.connection or self.app.connection_for_write()

������������������

connection = {Connection} 
conninfo = {Connection}

3.2 Exchange

Exchange ���������������

  • Exchange������������ ������ ���������������������������������������Exchange���Exchange���������������������������������
  • Queue���������������������������������������������������������������Exchange���������������������Queue���������������Queue���������������

���������������Exchange ���������������������������������exchange���exchange���������������queue������

������������������������������ routing_key ��� binding_key������������������binding_key ���consumer ���������������������������������������������

������������������routing-key��������� message ������������ binding-key���������queue ��������� exchange ���������������������

������������������������exchange���������������������direct���topic���fanout������������������������RabbitMQ������exchange������������������������������������exchages���������������������������������������������������������������������

������������������������

def get_exchange(conn, name=EVENT_EXCHANGE_NAME):    """Get exchange used for sending events.    Arguments:        conn (kombu.Connection): Connection used for sending/receiving events.        name (str): Name of the exchange. Default is ``celeryev``.    Note:        The event type changes if Redis is used as the transport        (from topic -> fanout).    """    ex = copy(event_exchange)    if conn.transport.driver_type == 'redis':        # quick hack for Issue #436        ex.type = 'fanout'    if name != ex.name:        ex.name = name    return ex

������������������

EVENT_EXCHANGE_NAME = 'celeryev'    self.exchange = {Exchange} Exchange celeryev(fanout)

������������������������������������ Exchange ������������ celeryev(fanout) ���������

3.3 ������

��������������������������������� Producer���

def enable(self):        self.producer = Producer(self.channel or self.connection,                                 exchange=self.exchange,                                 serializer=self.serializer,                                 auto_declare=False)        self.enabled = True        for callback in self.on_enabled:            callback()

0x04 ������������

��������������� Producer���������������������������������

4.1 Send ������

���������������������������������������������������

  • ���������������������������������������������������������������������������
  • ��������������������� Producer publish API ���������

������������������������������������������������

groups, group = self.groups, group_from(type)

������������������

group = {str} 'worker'groups = {set: 1} {'worker'}type = {str} 'worker-online'

���������������������������

def send(self, type, blind=False, utcoffset=utcoffset, retry=False,             retry_policy=None, Event=Event, **fields):        """Send event.        """        if self.enabled:            groups, group = self.groups, group_from(type)            if groups and group not in groups:                return            if group in self.buffer_group:                clock = self.clock.forward()                event = Event(type, hostname=self.hostname,                              utcoffset=utcoffset(),                              pid=self.pid, clock=clock, **fields)                buf = self._group_buffer[group]                buf.append(event)                if len(buf) >= self.buffer_limit:                    self.flush()                elif self.on_send_buffered:                    self.on_send_buffered()            else:                return self.publish(type, fields, self.producer, blind=blind,                                    Event=Event, retry=retry,                                    retry_policy=retry_policy)

4.2 publish ��� broker ������

send ���������������������

��������������� routing_key ���

routing_key=type.replace('-', '.')

���������������routing_key ��� 'worker.online'���

������������ Event���

event = {dict: 13}  'hostname' = {str} 'celery@DESKTOP-0GO3RPO' 'utcoffset' = {int} -8 'pid' = {int} 24320 'clock' = {int} 1 'freq' = {float} 2.0 'active' = {int} 0 'processed' = {int} 0 'loadavg' = {tuple: 3} (0.0, 0.0, 0.0) 'sw_ident' = {str} 'py-celery' 'sw_ver' = {str} '5.0.5' 'sw_sys' = {str} 'Windows' 'timestamp' = {float} 1611464767.3456059 'type' = {str} 'worker-online' __len__ = {int} 13

publish ���������������

def publish(self, type, fields, producer,                blind=False, Event=Event, **kwargs):        """Publish event using custom :class:`~kombu.Producer`.        Arguments:            type (str): Event type name, with group separated by dash (`-`).                fields: Dictionary of event fields, must be json serializable.            producer (kombu.Producer): Producer instance to use:                only the ``publish`` method will be called.            retry (bool): Retry in the event of connection failure.            retry_policy (Mapping): Map of custom retry policy options.                See :meth:`~kombu.Connection.ensure`.            blind (bool): Don't set logical clock value (also don't forward                the internal logical clock).            Event (Callable): Event type used to create event.                Defaults to :func:`Event`.            utcoffset (Callable): Function returning the current                utc offset in hours.        """        clock = None if blind else self.clock.forward()        event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),                      pid=self.pid, clock=clock, **fields)        with self.mutex:            return self._publish(event, producer,                                 routing_key=type.replace('-', '.'), **kwargs)    def _publish(self, event, producer, routing_key, retry=False,                 retry_policy=None, utcoffset=utcoffset):        exchange = self.exchange        try:            producer.publish(                event,                routing_key=routing_key,                exchange=exchange.name,                retry=retry,                retry_policy=retry_policy,                declare=[exchange],                serializer=self.serializer,                headers=self.headers,                delivery_mode=self.delivery_mode,            )        except Exception as exc:  # pylint: disable=broad-except            if not self.buffer_while_offline:                raise            self._outbound_buffer.append((event, routing_key, exc))

��������� pubsub������������������ redis ������������������������������

������redis������������������������������������

redis-cli.exe -p 6379127.0.0.1:6379> keys *1) "_kombu.binding.celery.pidbox"2) "_kombu.binding.celery"3) "_kombu.binding.celeryev"127.0.0.1:6379> smembers _kombu.binding.celeryev 1) "worker.#\x06\x16\x06\x16celeryev.64089900-d397-4564-b343-742664c1b214"127.0.0.1:6379> smembers _kombu.binding.celery1) "celery\x06\x16\x06\x16celery"127.0.0.1:6379> smembers _kombu.binding.celery.pidbox1) "\x06\x16\x06\x16celery@DESKTOP-0GO3RPO.celery.pidbox"127.0.0.1:6379>

���������EventDispatcher ������������������������������������

������������������������������������������������ Events ���������

0x05 Events ������

5.1 Event ������������

���������������Celery ��� Task/Worker ������������������������������������������ Event������������������������������������������������ Event ������������������ Celery ������������������������ WebUI ��������������� flower ������������ Event��������������������������������������������������������������������������������������� Event ������ Task ��������������������������� Task ������������������������������������������������������������������������������������������������������������������������������������������������������

  • ��� Task ���������������������
  • ��� Task ���������������������������
  • ������ Celery���Worker/Task��� ������������������

5.2 ������

Celery Events ������������������������������������������������dump������������������

���������

celery -A proj events -c myapp.DumpCam --frequency=2.0celery -A proj events --camera=
--frequency=1.0celery -A proj events --dump

������������������������������������������������

app.start(argv=['events'])

������������������������

def events(ctx, dump, camera, detach, frequency, maxrate, loglevel, **kwargs):    """Event-stream utilities."""    app = ctx.obj.app    if dump:        return _run_evdump(app)    if camera:        return _run_evcam(camera, app=app, freq=frequency, maxrate=maxrate,                          loglevel=loglevel,                          detach=detach,                          **kwargs)    return _run_evtop(app)

5.3 ������

Events������������

def _run_evtop(app):    try:        from celery.events.cursesmon import evtop        _set_process_status('top')        return evtop(app=app)

���������������������

def evtop(app=None):  # pragma: no cover    """Start curses monitor."""    app = app_or_default(app)    state = app.events.State()    display = CursesMonitor(state, app)    display.init_screen()    refresher = DisplayThread(display)    refresher.start()       capture_events(app, state, display)

5.4 ������������

������������������������������

��������������������� app.events.Receiver���

������������������ Receiver ��������� handlers={'*': state.event}������������������������������������������������

def capture_events(app, state, display):  # pragma: no cover    while 1:        with app.connection_for_read() as conn:            try:                conn.ensure_connection(on_connection_error,                                       app.conf.broker_connection_max_retries)                                recv = app.events.Receiver(conn, handlers={'*': state.event})                                display.resetscreen()                display.init_screen()                                recv.capture()                            except conn.connection_errors + conn.channel_errors as exc:                print(f'Connection lost: {exc!r}', file=sys.stderr)

��������������������������� recv.capture()���

���������������

Events   +--------------------+   |      loop          |   |                    |   |                    |   |                    |   |                    |   |                    v   |   |        EventReceiver.capture()   |   |                    +   |                    |   |                    |   |                    |   |                    |   |                    |   |                    |   +--------------------+

5.5 EventReceiver

EventReceiver ������������������Event������������������������������������������EventReceiver ��������� ConsumerMixin���

class EventReceiver(ConsumerMixin):    """Capture events.    Arguments:        connection (kombu.Connection): Connection to the broker.        handlers (Mapping[Callable]): Event handlers.            This is  a map of event type names and their handlers.            The special handler `"*"` captures all events that don't have a            handler.    """

������������������

def capture(self, limit=None, timeout=None, wakeup=True):        """Open up a consumer capturing events.        This has to run in the main process, and it will never stop        unless :attr:`EventDispatcher.should_stop` is set to True, or        forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.        """        for _ in self.consume(limit=limit, timeout=timeout, wakeup=wakeup):            pass

���������������������

self.consume = {method} 
>self = {EventReceiver}

��������������������� ConsumerMixin ������������������������������������������������������������������������ kombu . producer ��������������� kombu . consumer���

������������������������ EventReceiver ��������������� Connection��������� ConsumerMixin ������������������ Receiver��������� Receiver ��������������������� Event��������������������������������������� routing_key ������������������������

���������������

Events   +--------------------+   |      loop          |   |                    |   |                    |   |                    |   |                    |   |                    v   |   |     EventReceiver(ConsumerMixin).capture()   |   |                    +   |                    |   |                    |   |                    |   |                    |   |                    |   |                    |   +--------------------+

5.6 ConsumerMixin

ConsumerMixin ��� Kombu ��������� ��������������������������������������������� Consumer Programs���

class ConsumerMixin:    """Convenience mixin for implementing consumer programs.    It can be used outside of threads, with threads, or greenthreads    (eventlet/gevent) too.    The basic class would need a :attr:`connection` attribute    which must be a :class:`~kombu.Connection` instance,    and define a :meth:`get_consumers` method that returns a list    of :class:`kombu.Consumer` instances to use.    Supporting multiple consumers is important so that multiple    channels can be used for different QoS requirements.	"""

��������� ���kombu\mixins.py

def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs):        elapsed = 0        with self.consumer_context(**kwargs) as (conn, channel, consumers):            for i in limit and range(limit) or count():                if self.should_stop:                    break                self.on_iteration()                try:                    conn.drain_events(timeout=safety_interval)                except socket.timeout:                    conn.heartbeat_check()                    elapsed += safety_interval                    if timeout and elapsed >= timeout:                        raise                except OSError:                    if not self.should_stop:                        raise                else:                    yield                    elapsed = 0

5.6.1 Consumer

ConsumerMixin ������������ Consumer���������

@contextmanager    def Consumer(self):        with self.establish_connection() as conn:            self.on_connection_revived()            channel = conn.default_channel            cls = partial(Consumer, channel,                          on_decode_error=self.on_decode_error)            with self._consume_from(*self.get_consumers(cls, channel)) as c:                yield conn, channel, c            self.on_consume_end(conn, channel)

��� ������������������������self._receive��������� Consumer callback���

def get_consumers(self, Consumer, channel):        return [Consumer(queues=[self.queue],                         callbacks=[self._receive], no_ack=True,                         accept=self.accept)]

������������

get_consumers, receiver.py:72Consumer, mixins.py:230__enter__, contextlib.py:112consumer_context, mixins.py:181__enter__, contextlib.py:112consume, mixins.py:188capture, receiver.py:91evdump, dumper.py:95_run_evdump, events.py:21events, events.py:87caller, base.py:132new_func, decorators.py:21invoke, core.py:610invoke, core.py:1066invoke, core.py:1259main, core.py:782start, base.py:358
, myEvent.py:18

������������������

self.consume = {method} 
>self.queue = {Queue}
-> #>self._receive = {method}
>Consumer = {partial} functools.partial(
,
, on_decode_error=
>)channel = {Channel}
self = {EventReceiver}

������������

Events+-----------------------------------------+| EventReceiver(ConsumerMixin)            ||                                         ||                                         ||                                         |  consume|                                         |               +------------------+|                            capture  +-----------------> | Consumer         ||                                         |               |                  ||                                         |               |                  ||                                         |               |                  ||                           _receive  <----------------------+ callbacks     ||                                         |               |                  ||                                         |               |                  ||                                         |               +------------------++-----------------------------------------+

5.7 ������

������������������������������ _receive ���������������

def _receive(self, body, message, list=list, isinstance=isinstance):        if isinstance(body, list):  # celery 4.0+: List of events            process, from_message = self.process, self.event_from_message            [process(*from_message(event)) for event in body]        else:            self.process(*self.event_from_message(body))

5.8 ������

���������������������������������������

def process(self, type, event):        """Process event by dispatching to configured handler."""        handler = self.handlers.get(type) or self.handlers.get('*')        handler and handler(event)

���������������

��������� Receiver . handlers ��������� Receiver������ ��������� handlers={'*': state.event}������������������������������������������������

Events+-----------------------------------------+| EventReceiver(ConsumerMixin)            ||                                         ||                                         ||                                         |  consume|                                         |               +------------------+|                            capture  +-----------------> | Consumer         ||                                         |               |                  ||                                         |               |                  ||                                         |               |                  ||                           _receive  <----------------------+ callbacks     ||                                         |               |                  ||                                         |               |                  ||                                         |               +------------------+|                                         ||                            handlers +------------+|                                         |        |      +------------------++-----------------------------------------+        |      |state             |                                                   |      |                  |                                                   |      |                  |                                                   +-------->event           |                                                          |                  |                                                          |                  |                                                          +------------------+

5.9 state������������

���������������

@cached_property    def _event(self):        return self._create_dispatcher()

���������������������������

  1. ������ group ��� handler���������������������������������������������������������������������������������������������pass
  2. ��������� worker ��� Event������������ worker ���������������
  3. ��������� task ��� Event������������ task ���������������
def _create_dispatcher(self):        # noqa: C901        # pylint: disable=too-many-statements        # This code is highly optimized, but not for reusability.        get_handler = self.handlers.__getitem__        event_callback = self.event_callback        wfields = itemgetter('hostname', 'timestamp', 'local_received')        tfields = itemgetter('uuid', 'hostname', 'timestamp',                             'local_received', 'clock')        taskheap = self._taskheap        th_append = taskheap.append        th_pop = taskheap.pop        # Removing events from task heap is an O(n) operation,        # so easier to just account for the common number of events        # for each task (PENDING->RECEIVED->STARTED->final)        #: an O(n) operation        max_events_in_heap = self.max_tasks_in_memory * self.heap_multiplier        add_type = self._seen_types.add        on_node_join, on_node_leave = self.on_node_join, self.on_node_leave        tasks, Task = self.tasks, self.Task        workers, Worker = self.workers, self.Worker        # avoid updating LRU entry at getitem        get_worker, get_task = workers.data.__getitem__, tasks.data.__getitem__        get_task_by_type_set = self.tasks_by_type.__getitem__        get_task_by_worker_set = self.tasks_by_worker.__getitem__        def _event(event,                   timetuple=timetuple, KeyError=KeyError,                   insort=bisect.insort, created=True):            self.event_count += 1            if event_callback:                event_callback(self, event)            group, _, subject = event['type'].partition('-')            try:                handler = get_handler(group)            except KeyError:                pass            else:                return handler(subject, event), subject            if group == 'worker':                try:                    hostname, timestamp, local_received = wfields(event)                except KeyError:                    pass                else:                    is_offline = subject == 'offline'                    try:                        worker, created = get_worker(hostname), False                    except KeyError:                        if is_offline:                            worker, created = Worker(hostname), False                        else:                            worker = workers[hostname] = Worker(hostname)                    worker.event(subject, timestamp, local_received, event)                    if on_node_join and (created or subject == 'online'):                        on_node_join(worker)                    if on_node_leave and is_offline:                        on_node_leave(worker)                        workers.pop(hostname, None)                    return (worker, created), subject            elif group == 'task':                (uuid, hostname, timestamp,                 local_received, clock) = tfields(event)                # task-sent event is sent by client, not worker                is_client_event = subject == 'sent'                try:                    task, task_created = get_task(uuid), False                except KeyError:                    task = tasks[uuid] = Task(uuid, cluster_state=self)                    task_created = True                if is_client_event:                    task.client = hostname                else:                    try:                        worker = get_worker(hostname)                    except KeyError:                        worker = workers[hostname] = Worker(hostname)                    task.worker = worker                    if worker is not None and local_received:                        worker.event(None, local_received, timestamp)                origin = hostname if is_client_event else worker.id                # remove oldest event if exceeding the limit.                heaps = len(taskheap)                if heaps + 1 > max_events_in_heap:                    th_pop(0)                # most events will be dated later than the previous.                timetup = timetuple(clock, timestamp, origin, ref(task))                if heaps and timetup > taskheap[-1]:                    th_append(timetup)                else:                    insort(taskheap, timetup)                if subject == 'received':                    self.task_count += 1                task.event(subject, timestamp, local_received, event)                task_name = task.name                if task_name is not None:                    add_type(task_name)                    if task_created:  # add to tasks_by_type index                        get_task_by_type_set(task_name).add(task)                        get_task_by_worker_set(hostname).add(task)                if task.parent_id:                    try:                        parent_task = self.tasks[task.parent_id]                    except KeyError:                        self._add_pending_task_child(task)                    else:                        parent_task.children.add(task)                try:                    _children = self._tasks_to_resolve.pop(uuid)                except KeyError:                    pass                else:                    task.children.update(_children)                return (task, task_created), subject        return _event

���������������

Events+-----------------------------+| EventReceiver(ConsumerMixin ||                             ||                             |               +------------------+|                             |  consume      | Consumer         ||                             |               |                  ||                capture  +-----------------> |                  ||                             |               |                  ||                             |               |                  ||                             |               |                  ||               _receive  <----------------------+ callbacks     ||                             |               |                  ||                             |               |                  ||                             |               +------------------+|                             ||                handlers +------------+|                             |        |      +------------------------++-----------------------------+        |      |state                   |                                       |      |                        |                                       |      |                        |                                       +---------> event +---+         |                                              |              |         |                                              |              |         |                                              |              v         |                                              |     _create_dispatcher |                                              |              +         |                                              |              |         |                                              |              |         |                                              |              |         |                                              +------------------------+                                                             |                                                             |                                                    +--------+------+                                group == 'worker'   |               | group == 'task'                                                    |               |                                                    v               v                                          worker.event          task.event

������������������������

Producer Scope   +         Broker      +   Consumer Scope                                      |                     |+-----------------------------+       |     Redis pubsub    |     Events| EventDispatcher             |       |                     ||                             |       |                     |     +-----------------------------+|                             |       |                     |     | EventReceiver(ConsumerMixin ||                             |       |                     |     |                             ||        connection           |       |                     |     |                             |               +------------------+|                             |       |                     |     |                             |  consume      | Consumer         ||        channel              |       |                     |     |                             |               |                  ||                             |       |                     |     |                capture  +-----------------> |                  ||        producer  +----------------------->  Event +-----------> |                             |               |                  ||                             |       |                     |     |                             |               |                  ||        exchange             |       |                     |     |                             |               |                  ||                             |       |                     |     |               _receive  <----------------------+ callbacks     ||        hostname             |       |                     |     |                             |               |                  ||                             |       |                     |     |                             |               |                  ||        groups               |       |                     |     |                             |               +------------------+|                             |       |                     |     |                             ||        _outbound_buffer     |       |                     |     |                handlers +------------+|                             |       |                     |     |                             |        |      +------------------------+|        clock                |       |                     |     +-----------------------------+        |      |state                   ||                             |       |                     |                                            |      |                        |+-----------------------------+       |                     |                                            |      |                        |                                      |                     |                                            +---------> event +---+         |                                      |                     |                                                   |              |         |                                      |                     |                                                   |              |         |                                      |                     |                                                   |              v         |                                      |                     |                                                   |     _create_dispatcher |                                      |                     |                                                   |              +         |                                      |                     |                                                   |              |         |                                      |                     |                                                   |              |         |                                      |                     |                                                   |              |         |                                      |                     |                                                   +------------------------+                                      |                     |                                                                  |                                      |                     |                                                                  |                                      |                     |                                                         +--------+------+                                      |                     |                                     group == 'worker'   |               | group == 'task'                                      |                     |                                                         |               |                                      |                     |                                                         v               v                                      +                     +                                               worker.event          task.event

���������������

���������Celery ������������������������������������ ���������������������������������

0xEE ������������

������������������������������������������������������������������

������������������������������������

������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������

0xFF ������

上一篇:泛型(泛型擦除)
下一篇:NSIS制作安装包笔记(二):NSIS使用NSIS+Qt界面制作安装包流程

发表评论

最新留言

很好
[***.229.124.182]2025年04月22日 21时43分33秒