本文共 28859 字,大约阅读时间需要 96 分钟。
celery源码分析
本文环境python3.5.2,celery4.0.2,django1.10.x系列
celery的任务发送
在Django项目中使用了装饰器来包装待执行任务,
from celery import shared_task, app@shared_taskdef add(x, y): return x + y@app.task(bind=True)def debug_task(self): print('Request: {0!r}'.format(self.request))
此时分析一下,Task是怎样在celery中执行的。
首先,先看shared_task函数,
def shared_task(*args, **kwargs): """Create shared task (decorator). This can be used by library authors to create tasks that'll work for any app environment. # 由shared_task装饰的任务可以被任何app调用 Returns: ~celery.local.Proxy: A proxy that always takes the task from the current apps task registry. Example: >>> from celery import Celery, shared_task >>> @shared_task ... def add(x, y): ... return x + y ... >>> app1 = Celery(broker='amqp://') >>> add.app is app1 True >>> app2 = Celery(broker='redis://') >>> add.app is app2 True """ def create_shared_task(**options): def __inner(fun): name = options.get('name') # 从装饰器中获取是否传入name参数 # Set as shared task so that unfinalized apps, # and future apps will register a copy of this task. _state.connect_on_app_finalize( lambda app: app._task_from_fun(fun, **options) ) # 将该task添加到全局变量中,当其他app调用该函数时会将该任务添加到app任务列表中,以此达到所有任务共享 # Force all finalized apps to take this task as well. for app in _state._get_active_apps(): # 获取所有app的弱引用 if app.finalized: # 是否任务初始化过 with app._finalize_mutex: # 获取线程锁 app._task_from_fun(fun, **options) # 加载该任务 # Return a proxy that always gets the task from the current # apps task registry. def task_by_cons(): app = _state.get_current_app() # 获取当前的app return app.tasks[ name or app.gen_task_name(fun.__name__, fun.__module__) ] # 根据task的name或者fun来获取对应的task return Proxy(task_by_cons) # 通过代理类实例化task_by_cons return __inner # 返回被__inner if len(args) == 1 and callable(args[0]): # 如果装饰器传入参数就1个并且是可调用的,即shared_task没有传入参数 return create_shared_task(**kwargs)(args[0]) # 直接调用该函数并传入该函数 return create_shared_task(*args, **kwargs) # 处理shared_task中的传入参数
按照示例中的无参数调用则返回了Proxy的实例,传入参数就是task_by_cons,此时查看一下Proxy类的实现,该类位于celery/local.py中,
class Proxy(object): """Proxy to another object.""" # Code stolen from werkzeug.local.Proxy. __slots__ = ('__local', '__args', '__kwargs', '__dict__') def __init__(self, local, args=None, kwargs=None, name=None, __doc__=None): object.__setattr__(self, '_Proxy__local', local) # 将传入参数local设置到_Proxy__local属性中 object.__setattr__(self, '_Proxy__args', args or ()) # 设置列表属性 object.__setattr__(self, '_Proxy__kwargs', kwargs or {}) # 设置键值属性 if name is not None: object.__setattr__(self, '__custom_name__', name) if __doc__ is not None: object.__setattr__(self, '__doc__', __doc__) ... def _get_current_object(self): """Get current object. This is useful if you want the real object behind the proxy at a time for performance reasons or because you want to pass the object into a different context. """ loc = object.__getattribute__(self, '_Proxy__local') # 获取初始化传入的local if not hasattr(loc, '__release_local__'): # 如果没有__release_local__属性 return loc(*self.__args, **self.__kwargs) # 函数调用,将初始化的值传入调用该函数 try: # pragma: no cover # not sure what this is about return getattr(loc, self.__name__) # 获取当前__name__属性值 except AttributeError: # pragma: no cover raise RuntimeError('no object bound to {0.__name__}'.format(self)) ... def __getattr__(self, name): if name == '__members__': return dir(self._get_current_object()) return getattr(self._get_current_object(), name) # 获取obj的属性 def __setitem__(self, key, value): self._get_current_object()[key] = value # 设置key val def __delitem__(self, key): del self._get_current_object()[key] # 删除对应key def __setslice__(self, i, j, seq): self._get_current_object()[i:j] = seq # 列表操作 def __delslice__(self, i, j): del self._get_current_object()[i:j] def __setattr__(self, name, value): setattr(self._get_current_object(), name, value) # 设置属性 def __delattr__(self, name): delattr(self._get_current_object(), name) # 删除对应属性
只选取了部分属性分析如上,主要是根据传入的是否local是否是函数,或者包含release_local来判断是否是调用函数,或是获取属性来处理。
此时在初始化过程中,为每个app添加该任务时,会调用到app._task_from_fun(fun, **options),
def _task_from_fun(self, fun, name=None, base=None, bind=False, **options): if not self.finalized and not self.autofinalize: raise RuntimeError('Contract breach: app not finalized') name = name or self.gen_task_name(fun.__name__, fun.__module__) # 如果传入了名字则使用,否则就使用moudle name的形式 base = base or self.Task # 是否传入Task,否则用类自己的Task类 默认celery.app.task:Task if name not in self._tasks: # 如果要加入的任务名称不再_tasks中 run = fun if bind else staticmethod(fun) # 是否bind该方法是则直接使用该方法,否则就置为静态方法 task = type(fun.__name__, (base,), dict({ 'app': self, # 动态创建Task类实例 'name': name, # Task的name 'run': run, # task的run方法 '_decorated': True, # 是否装饰 '__doc__': fun.__doc__, '__module__': fun.__module__, '__header__': staticmethod(head_from_fun(fun, bound=bind)), '__wrapped__': run}, **options))() # for some reason __qualname__ cannot be set in type() # so we have to set it here. try: task.__qualname__ = fun.__qualname__ except AttributeError: pass self._tasks[task.name] = task # 将任务添加到_tasks任务中 task.bind(self) # connects task to this app # 调用task的bind方法绑定相关属性到该实例上 autoretry_for = tuple(options.get('autoretry_for', ())) retry_kwargs = options.get('retry_kwargs', {}) if autoretry_for and not hasattr(task, '_orig_run'): @wraps(task.run) def run(*args, **kwargs): try: return task._orig_run(*args, **kwargs) except autoretry_for as exc: raise task.retry(exc=exc, **retry_kwargs) task._orig_run, task.run = task.run, run else: task = self._tasks[name] # 否则获取该task return task # 返回该task
其中task在默认情况下是celery.app.task:Task,在动态生成该实例后,滴啊用了task.bind(self)方法,
@classmethoddef bind(cls, app): was_bound, cls.__bound__ = cls.__bound__, True cls._app = app # 设置类的_app属性 conf = app.conf # 获取app的配置信息 cls._exec_options = None # clear option cache if cls.typing is None: cls.typing = app.strict_typing for attr_name, config_name in cls.from_config: # 设置类中的默认值 if getattr(cls, attr_name, None) is None: # 如果获取该属性为空 setattr(cls, attr_name, conf[config_name]) # 使用app配置中的默认值 # decorate with annotations from config. if not was_bound: cls.annotate() from celery.utils.threads import LocalStack cls.request_stack = LocalStack() # 使用线程栈保存数据 # PeriodicTask uses this to add itself to the PeriodicTask schedule. cls.on_bound(app) return app
此时在Django项目中调用该异步任务时,如下调用,
add.delay(1,2)
此时就是通过代理类获取task的delay方法,
def delay(self, *args, **kwargs): """Star argument version of :meth:`apply_async`. Does not support the extra options enabled by :meth:`apply_async`. Arguments: *args (Any): Positional arguments passed on to the task. **kwargs (Any): Keyword arguments passed on to the task. Returns: celery.result.AsyncResult: Future promise. """ return self.apply_async(args, kwargs)
此时直接调用了self.apply_async方法,
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options): """Apply tasks asynchronously by sending a message. Arguments: args (Tuple): The positional arguments to pass on to the task. kwargs (Dict): The keyword arguments to pass on to the task. countdown (float): Number of seconds into the future that the task should execute. Defaults to immediate execution. eta (~datetime.datetime): Absolute time and date of when the task should be executed. May not be specified if `countdown` is also supplied. expires (float, ~datetime.datetime): Datetime or seconds in the future for the task should expire. The task won't be executed after the expiration time. shadow (str): Override task name used in logs/monitoring. Default is retrieved from :meth:`shadow_name`. connection (kombu.Connection): Re-use existing broker connection instead of acquiring one from the connection pool. retry (bool): If enabled sending of the task message will be retried in the event of connection loss or failure. Default is taken from the :setting:`task_publish_retry` setting. Note that you need to handle the producer/connection manually for this to work. retry_policy (Mapping): Override the retry policy used. See the :setting:`task_publish_retry_policy` setting. queue (str, kombu.Queue): The queue to route the task to. This must be a key present in :setting:`task_queues`, or :setting:`task_create_missing_queues` must be enabled. See :ref:`guide-routing` for more information. exchange (str, kombu.Exchange): Named custom exchange to send the task to. Usually not used in combination with the ``queue`` argument. routing_key (str): Custom routing key used to route the task to a worker server. If in combination with a ``queue`` argument only used to specify custom routing keys to topic exchanges. priority (int): The task priority, a number between 0 and 9. Defaults to the :attr:`priority` attribute. serializer (str): Serialization method to use. Can be `pickle`, `json`, `yaml`, `msgpack` or any custom serialization method that's been registered with :mod:`kombu.serialization.registry`. Defaults to the :attr:`serializer` attribute. compression (str): Optional compression method to use. Can be one of ``zlib``, ``bzip2``, or any custom compression methods registered with :func:`kombu.compression.register`. Defaults to the :setting:`task_compression` setting. link (~@Signature): A single, or a list of tasks signatures to apply if the task returns successfully. link_error (~@Signature): A single, or a list of task signatures to apply if an error occurs while executing the task. producer (kombu.Producer): custom producer to use when publishing the task. add_to_parent (bool): If set to True (default) and the task is applied while executing another task, then the result will be appended to the parent tasks ``request.children`` attribute. Trailing can also be disabled by default using the :attr:`trail` attribute publisher (kombu.Producer): Deprecated alias to ``producer``. headers (Dict): Message headers to be included in the message. Returns: ~@AsyncResult: Promise of future evaluation. Raises: TypeError: If not enough arguments are passed, or too many arguments are passed. Note that signature checks may be disabled by specifying ``@task(typing=False)``. kombu.exceptions.OperationalError: If a connection to the transport cannot be made, or if the connection is lost. Note: Also supports all keyword arguments supported by :meth:`kombu.Producer.publish`. """ if self.typing: try: check_arguments = self.__header__ # 获取参数 except AttributeError: # pragma: no cover pass else: check_arguments(*(args or ()), **(kwargs or {})) app = self._get_app() # 获取当前app if app.conf.task_always_eager: # 如果该配置为true return self.apply(args, kwargs, task_id=task_id or uuid(), link=link, link_error=link_error, **options) # 本地执行该任务并返回结果 # add 'self' if this is a "task_method". if self.__self__ is not None: args = args if isinstance(args, tuple) else tuple(args or ()) args = (self.__self__,) + args shadow = shadow or self.shadow_name(args, kwargs, options) preopts = self._get_exec_options() # 获取队列等信息 options = dict(preopts, **options) if options else preopts # 设置成字典类型 return app.send_task( self.name, args, kwargs, task_id=task_id, producer=producer, link=link, link_error=link_error, result_cls=self.AsyncResult, shadow=shadow, task_type=self, **options ) # 调用app发送send_task
该方法比较复杂,主要是进行了组装待发送任务的任务的参数,如connection,queue,exchange,routing_key等,如果是配置了本地直接执行则本地执行直接返回结果,否则调用app实例的send_task发送任务。
def send_task(self, name, args=None, kwargs=None, countdown=None, eta=None, task_id=None, producer=None, connection=None, router=None, result_cls=None, expires=None, publisher=None, link=None, link_error=None, add_to_parent=True, group_id=None, retries=0, chord=None, reply_to=None, time_limit=None, soft_time_limit=None, root_id=None, parent_id=None, route_name=None, shadow=None, chain=None, task_type=None, **options): """Send task by name. Supports the same arguments as :meth:`@-Task.apply_async`. Arguments: name (str): Name of task to call (e.g., `"tasks.add"`). result_cls (~@AsyncResult): Specify custom result class. """ parent = have_parent = None amqp = self.amqp # 获取amqp实例 task_id = task_id or uuid() # 设置任务id,如果没有传入则生成任务id producer = producer or publisher # XXX compat # 生成这 router = router or amqp.router # 路由值,如果没有则使用amqp的router conf = self.conf # 获取配置信息 if conf.task_always_eager: # pragma: no cover # 如果配置了本地执行则打印信息 warnings.warn(AlwaysEagerIgnored( 'task_always_eager has no effect on send_task', ), stacklevel=2) options = router.route( options, route_name or name, args, kwargs, task_type) # 生成route信息 if not root_id or not parent_id: parent = self.current_worker_task if parent: if not root_id: root_id = parent.request.root_id or parent.request.id if not parent_id: parent_id = parent.request.id message = amqp.create_task_message( task_id, name, args, kwargs, countdown, eta, group_id, expires, retries, chord, maybe_list(link), maybe_list(link_error), reply_to or self.oid, time_limit, soft_time_limit, self.conf.task_send_sent_event, root_id, parent_id, shadow, chain, ) # 生成任务信息 if connection: producer = amqp.Producer(connection) # 如果有连接则生成生产者 with self.producer_or_acquire(producer) as P: with P.connection._reraise_as_library_errors(): self.backend.on_task_call(P, task_id) amqp.send_task_message(P, name, message, **options) # 发送任务消息 result = (result_cls or self.AsyncResult)(task_id) # 生成异步任务实例 if add_to_parent: if not have_parent: parent, have_parent = self.current_worker_task, True if parent: parent.add_trail(result) return result # 返回结果
至此一个任务就发送出去,等待着消费者消费掉任务。
worker消费task的概述
在分析celery的worker的启动过程中,最后开启了loop等待任务来消费,启动定义的回调函数就是on_task_received,
def on_task_received(message): # payload will only be set for v1 protocol, since v2 # will defer deserializing the message body to the pool. payload = None try: type_ = message.headers['task'] # protocol v2 # 获取任务 except TypeError: return on_unknown_message(None, message) # 如果解析失败 except KeyError: try: payload = message.decode() # 再次解析消息 except Exception as exc: # pylint: disable=broad-except return self.on_decode_error(message, exc) try: type_, payload = payload['task'], payload # protocol v1 # 利用协议解析任务 except (TypeError, KeyError): return on_unknown_message(payload, message) try: strategy = strategies[type_] # 获取type_的对应stratepy except KeyError as exc: return on_unknown_task(None, message, exc) else: try: strategy( message, payload, promise(call_soon, (message.ack_log_error,)), promise(call_soon, (message.reject_log_error,)), callbacks, ) # 处理获取的信息内容 except InvalidTaskError as exc: return on_invalid_task(payload, message, exc)
至此,从Django应用客户端发送的消息就到达了启动的worker的进程并被消费掉。
大概的消费流程如下, 此时的strategies就是在consumer的task实例在启动start时,调用的update_strategies方法,def update_strategies(self): loader = self.app.loader # app的加载器 for name, task in items(self.app.tasks): # 遍历所有的任务 self.strategies[name] = task.start_strategy(self.app, self) # 将task的name设为key 将task调用的返回值作为key task.__trace__ = build_tracer(name, task, loader, self.hostname, app=self.app) # 处理相关执行结果的函数
此时我们继续查看task.start_strategy函数,
def start_strategy(self, app, consumer, **kwargs): return instantiate(self.Strategy, self, app, consumer, **kwargs) # 生成task实例
此时self.Strategy的默认值是celery.worker.strategy:default,
def default(task, app, consumer, info=logger.info, error=logger.error, task_reserved=task_reserved, to_system_tz=timezone.to_system, bytes=bytes, buffer_t=buffer_t, proto1_to_proto2=proto1_to_proto2): """Default task execution strategy. Note: Strategies are here as an optimization, so sadly it's not very easy to override. """ hostname = consumer.hostname # 设置相关的消费者信息 connection_errors = consumer.connection_errors # 设置错误值 _does_info = logger.isEnabledFor(logging.INFO) # task event related # (optimized to avoid calling request.send_event) eventer = consumer.event_dispatcher events = eventer and eventer.enabled send_event = eventer.send task_sends_events = events and task.send_events call_at = consumer.timer.call_at apply_eta_task = consumer.apply_eta_task rate_limits_enabled = not consumer.disable_rate_limits get_bucket = consumer.task_buckets.__getitem__ handle = consumer.on_task_request limit_task = consumer._limit_task body_can_be_buffer = consumer.pool.body_can_be_buffer Req = create_request_cls(Request, task, consumer.pool, hostname, eventer) # 返回一个请求类 revoked_tasks = consumer.controller.state.revoked def task_message_handler(message, body, ack, reject, callbacks, to_timestamp=to_timestamp): if body is None: body, headers, decoded, utc = ( message.body, message.headers, False, True, ) if not body_can_be_buffer: body = bytes(body) if isinstance(body, buffer_t) else body else: body, headers, decoded, utc = proto1_to_proto2(message, body) # 解析接受的数据 req = Req( message, on_ack=ack, on_reject=reject, app=app, hostname=hostname, eventer=eventer, task=task, connection_errors=connection_errors, body=body, headers=headers, decoded=decoded, utc=utc, ) # 实例化请求 if _does_info: info('Received task: %s', req) if (req.expires or req.id in revoked_tasks) and req.revoked(): return if task_sends_events: send_event( 'task-received', uuid=req.id, name=req.name, args=req.argsrepr, kwargs=req.kwargsrepr, root_id=req.root_id, parent_id=req.parent_id, retries=req.request_dict.get('retries', 0), eta=req.eta and req.eta.isoformat(), expires=req.expires and req.expires.isoformat(), ) # 如果需要发送接受请求则发送 if req.eta: # 时间相关处理 try: if req.utc: eta = to_timestamp(to_system_tz(req.eta)) else: eta = to_timestamp(req.eta, timezone.local) except (OverflowError, ValueError) as exc: error("Couldn't convert ETA %r to timestamp: %r. Task: %r", req.eta, exc, req.info(safe=True), exc_info=True) req.reject(requeue=False) else: consumer.qos.increment_eventually() call_at(eta, apply_eta_task, (req,), priority=6) else: if rate_limits_enabled: # 速率限制 bucket = get_bucket(task.name) if bucket: return limit_task(req, bucket, 1) task_reserved(req) # if callbacks: [callback(req) for callback in callbacks] handle(req) # 处理接受的请求 return task_message_handler
此时处理的handler就是在consumer初始化的时候传入的w.process_task,
def _process_task(self, req): """Process task by sending it to the pool of workers.""" try: req.execute_using_pool(self.pool) except TaskRevokedError: try: self._quick_release() # Issue 877 except AttributeError: pass
接着就会调用,req.execute_using_pool来执行该任务,该request位于create_request_cls中的Request类的方法,
class Request(base): def execute_using_pool(self, pool, **kwargs): task_id = self.id # 获取任务id if (self.expires or task_id in revoked_tasks) and self.revoked(): # 检查是否过期或者是否已经执行过 raise TaskRevokedError(task_id) time_limit, soft_time_limit = self.time_limits # 获取时间 result = apply_async( # 执行对应的func并返回结果 trace, args=(self.type, task_id, self.request_dict, self.body, self.content_type, self.content_encoding), accept_callback=self.on_accepted, timeout_callback=self.on_timeout, callback=self.on_success, error_callback=self.on_failure, soft_timeout=soft_time_limit or default_soft_time_limit, timeout=time_limit or default_time_limit, correlation_id=task_id, ) # cannot create weakref to None # pylint: disable=attribute-defined-outside-init self._apply_result = maybe(ref, result) return result
此时调用的apply_async其实就是pool.apply_async的方法,传入的执行方法就是trace_task_ret,
def trace_task(task, uuid, args, kwargs, request={}, **opts): """Trace task execution.""" try: if task.__trace__ is None: task.__trace__ = build_tracer(task.name, task, **opts) return task.__trace__(uuid, args, kwargs, request) # 调用在strategy更新时写入的方法 except Exception as exc: return trace_ok_t(report_internal_error(task, exc), None, 0.0, None)def _trace_task_ret(name, uuid, request, body, content_type, content_encoding, loads=loads_message, app=None, **extra_request): app = app or current_app._get_current_object() # 获取app embed = None if content_type: accept = prepare_accept_content(app.conf.accept_content) args, kwargs, embed = loads( body, content_type, content_encoding, accept=accept, ) else: args, kwargs, embed = body hostname = gethostname() request.update({ 'args': args, 'kwargs': kwargs, 'hostname': hostname, 'is_eager': False, }, **embed or {}) R, I, T, Rstr = trace_task(app.tasks[name], uuid, args, kwargs, request, app=app) # 调用trace_task执行task return (1, R, T) if I else (0, Rstr, T)trace_task_ret = _trace_task_ret
在update_stragegy时传入的方法是,
task.__trace__ = build_tracer(name, task, loader, self.hostname, app=self.app)
build_tracer函数的部分解析是,
def build_tracer(name, task, loader=None, hostname=None, store_errors=True, Info=TraceInfo, eager=False, propagate=False, app=None, monotonic=monotonic, truncate=truncate, trace_ok_t=trace_ok_t, IGNORE_STATES=IGNORE_STATES): fun = task if task_has_custom(task, '__call__') else task.run # 获取task对应的run函数 ... def trace_task(uuid, args, kwargs, request=None): # R - is the possibly prepared return value. # I - is the Info object. # T - runtime # Rstr - textual representation of return value # retval - is the always unmodified return value. # state - is the resulting task state. # This function is very long because we've unrolled all the calls # for performance reasons, and because the function is so long # we want the main variables (I, and R) to stand out visually from the # the rest of the variables, so breaking PEP8 is worth it ;) R = I = T = Rstr = retval = state = None task_request = None time_start = monotonic() ... # -*- TRACE -*- try: R = retval = fun(*args, **kwargs) # 执行对应的函数 state = SUCCESS except Reject as exc: ... return trace_task
此时调用的fun函数就是task本来应该执行的函数,此时就执行了对应task并获得了函数执行的返回结果。
至此,一个简单的消息的发送和消费的过程就完成了。本文总结
主要是讲述了一个task任务从客户端的发送过程,然后服务端获得任务后并消费掉该任务,从而完成任务的消费,虽然本文的分析略显粗略,只是大致描述了任务的发送和消费,其中很多细节没有一一分析,大家如有兴趣可自行分析。
转载地址:https://blog.csdn.net/qq_33339479/article/details/80961182 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!