celery源码分析-定时任务
发布日期:2021-07-25 13:04:39 浏览次数:6 分类:技术文章

本文共 16938 字,大约阅读时间需要 56 分钟。

celery源码分析

本文环境python3.5.2,celery4.0.2,django1.10.x系列

celery的定时任务与Django配置

celery也可以执行定时任务来执行相关操作,celery与django的配置方法如下,

1.在celery_app.tasks中添加如下任务

@shared_taskdef beat_task():    print("beat_task")

2.在celery_django.setting文件中添加如下配置,

INSTALLED_APPS = [    'django.contrib.admin',    'django.contrib.auth',    'django.contrib.contenttypes',    'django.contrib.sessions',    'django.contrib.messages',    'django.contrib.staticfiles',    'celery_app',    'django_celery_beat']

这是因为celery与django配合的情况下使用了django_celery_beat第三方库,所以需要将该app注册进去。

3.在django.celery.py文件中添加如下配置,

from celery_django import settingsapp.autodiscover_tasks(lambda : settings.INSTALLED_APPS)from datetime import timedeltaCELERYBEAT_SCHEDULE = {    'test_beat': {        'task': 'celery_app.tasks.beat_task',        'schedule': timedelta(seconds=3),    },}app.conf.update(CELERYBEAT_SCHEDULE=CELERYBEAT_SCHEDULE)

至此,配置文件就配置完成此时,先启动定时任务命令,

(venv) wuzideMacBook-Air:celery_django wuzi$ celery beat -A celery_django -S django

然后,启动worker服务区消费时间到了需要执行的任务,

celery -A celery_django worker

此时在worker的终端上就会输出,如下,

-------------- celery@wuzideMacBook-Air.local v4.0.0 (latentcall)    ---- **** -----     --- * ***  * -- Darwin-15.6.0-x86_64-i386-64bit 2018-07-12 07:24:56    -- * - **** ---     - ** ---------- [config]    - ** ---------- .> app:         celery_django:0x108934240    - ** ---------- .> transport:   redis://127.0.0.1:6379/7    - ** ---------- .> results:     redis://127.0.0.1:6379/6    - *** --- * --- .> concurrency: 4 (prefork)    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)    --- ***** -----      -------------- [queues]                    .> celery           exchange=celery(direct) key=celery    [2018-07-12 07:24:57,512: WARNING/MainProcess] /Users/wuzi/python35/venv/lib/python3.5/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!      warnings.warn('Using settings.DEBUG leads to a memory leak, never '    [2018-07-12 07:24:57,546: WARNING/PoolWorker-3] beat_task    [2018-07-12 07:24:57,550: WARNING/PoolWorker-4] beat_task    [2018-07-12 07:24:57,551: WARNING/PoolWorker-2] beat_task    [2018-07-12 07:24:57,560: WARNING/PoolWorker-1] beat_task

celery的定时任务分析

此时在位于celery_django目录下,在终端中输入如下命令,

celery beat -A celery_django -S django

根据前文的分析,此时进入的CeleryCommand中的commands中对应的beat对应的类,beat来执行,此时也是执行如下代码,

return cls(            app=self.app, on_error=self.on_error,            no_color=self.no_color, quiet=self.quiet,            on_usage_error=partial(self.on_usage_error, command=command),        ).run_from_argv(self.prog_name, argv[1:], command=argv[0])      # 实例化该类,实例化之后调用run_from_argv

此时cls对应的是beat类,但是通过查看位于bin/beat.py中的beat类可知,该类只重写了run方法和add_arguments方法,所以此时执行的run_from_argv方法是beat继承的Command的run_from_argv方法,该方法中会调用Command的handle_argv方法,而该方法在经过相关参数处理后会调用到call函数,在该函数中会调用到run方法,此时调用的run方法就是beat类中重写的run方法,查看该方法,

def run(self, detach=False, logfile=None, pidfile=None, uid=None,        gid=None, umask=None, workdir=None, **kwargs):    if not detach:                                                  # 是否是开启后台运行        maybe_drop_privileges(uid=uid, gid=gid)             kwargs.pop('app', None)                                         # 去除app传入参宿    beat = partial(self.app.Beat,                   logfile=logfile, pidfile=pidfile, **kwargs)      # 设置偏函数,设置调用该类的默认参数    if detach:                                                      # 是否后台运行        with detached(logfile, pidfile, uid, gid, umask, workdir):            return beat().run()                                     # 后台运行    else:        return beat().run()                                         # 启动运行

此时调用了app的Beat属性,

@cached_propertydef Beat(self, **kwargs):    """:program:`celery beat` scheduler application.    See Also:        :class:`~@Beat`.    """    return self.subclass_with_self('celery.apps.beat:Beat')     # 导入celery.apps.beat:Beat类

此时就实例化了celery.apps.beat中的Beat类,并调用了该实例的run方法,

def run(self):    print(str(self.colored.cyan(        'celery beat v{0} is starting.'.format(VERSION_BANNER))))    self.init_loader()                                      # 通过Loader加装    self.set_process_title()                        self.start_scheduler()                                  # 开启任务调度

其中的init_loader方法继续查看,位于celery/loaders/base.py中BaseLoader类的init_worker方法,

def init_worker(self):    if not self.worker_initialized:             # 如果该类没有被设置过        self.worker_initialized = True          # 设置成设置过        self.import_default_modules()           # 导入默认的modules        self.on_worker_init()

此时import_default_modules的函数如下,

def import_default_modules(self):    signals.import_modules.send(sender=self.app)        # 利用观察者默认通知导入apps    return [        self.import_task_module(m) for m in (            tuple(self.builtin_modules) +            tuple(maybe_list(self.app.conf.imports)) +            tuple(maybe_list(self.app.conf.include))        )                                       # 导入项目中需要导入的modules    ]

此时继续,分析 self.start_scheduler() 函数,该函数就是即将要启动的定时任务,

def start_scheduler(self):    if self.pidfile:                                        # 检查是否配置了pidfile        platforms.create_pidlock(self.pidfile)              # 创建pid文件    service = self.Service(        app=self.app,        max_interval=self.max_interval,        scheduler_cls=self.scheduler_cls,        schedule_filename=self.schedule,    )                                                       # 初始化service类    print(self.banner(service))                             # 打印相关启动信息    self.setup_logging()                                    # 开启日志    if self.socket_timeout:        logger.debug('Setting default socket timeout to %r',                     self.socket_timeout)        socket.setdefaulttimeout(self.socket_timeout)       # 设置连接超时时间    try:        self.install_sync_handler(service)                  # 注册handler        service.start()                                     # 开启    except Exception as exc:        logger.critical('beat raised exception %s: %r',                        exc.__class__, exc,                        exc_info=True)        raise

其中,Service类就是beat.Service,然后调用service的start方法,

def start(self, embedded_process=False):    info('beat: Starting...')    debug('beat: Ticking with max interval->%s',          humanize_seconds(self.scheduler.max_interval))            # 打印最大间隔时间    signals.beat_init.send(sender=self)                             # 通知注册该signal的函数    if embedded_process:         signals.beat_embedded_init.send(sender=self)        platforms.set_process_title('celery beat')    try:        while not self._is_shutdown.is_set():                       # 检查Event是否被设置            interval = self.scheduler.tick()                        # 调用scheduler.tick()函数检查还剩多余时间秀米昂            if interval and interval > 0.0:                         # 如果大于0                debug('beat: Waking up %s.',                      humanize_seconds(interval, prefix='in '))                time.sleep(interval)                                # 休眠                if self.scheduler.should_sync():                    # 是否同步数据                    self.scheduler._do_sync()                       # 同步数据    except (KeyboardInterrupt, SystemExit):        self._is_shutdown.set()    finally:        self.sync()

由于在self.scheduler.max_interval时,调用了service的scheduler属性,

@cached_propertydef scheduler(self):    return self.get_scheduler()                                     # 获取scheduler

调用了service的get_scheduler函数,

def get_scheduler(self, lazy=False,                  extension_namespace='celery.beat_schedulers'):    filename = self.schedule_filename                               # 此时默认的传入是celerybeat-schedule    aliases = dict(        load_extension_class_names(extension_namespace) or {})      # 导入celery.beat_schedulers对应的类    return symbol_by_name(self.scheduler_cls, aliases=aliases)(        app=self.app,        schedule_filename=filename,        max_interval=self.max_interval,        lazy=lazy,    )                                                               # 此时如果是默认的则是django_celery_beat.schedulers:DatabaseScheduler类

其中load_extension_class_names如下,

def load_extension_class_names(namespace):    try:        from pkg_resources import iter_entry_points    except ImportError:  # pragma: no cover        return    for ep in iter_entry_points(namespace):        yield ep.name, ':'.join([ep.module_name, ep.attrs[0]])

该函数的功能是查找对应的celery.beat_schedulers对应的入口配置,该文件存在与site-packages/django_celery_beat-1.0.1.dist-info文件中,该文件内容如下,

[celery.beat_schedulers]django = django_celery_beat.schedulers:DatabaseScheduler

此时返回并导入的类就是django_celery_beat.schedulers:DatabaseScheduler该类。

当初始化完成后就执行到了self.scheduler.tick()函数,查看DatabaseScheduler的tick函数,

def tick(self, event_t=event_t, min=min,         heappop=heapq.heappop, heappush=heapq.heappush,         heapify=heapq.heapify, mktime=time.mktime):    """Run a tick - one iteration of the scheduler.    Executes one due task per call.    Returns:        float: preferred delay in seconds for next call.    """    def _when(entry, next_time_to_run):        return (mktime(entry.schedule.now().timetuple()) +                (adjust(next_time_to_run) or 0))    adjust = self.adjust    max_interval = self.max_interval                                    # 最大的休眠时间    H = self._heap                                                      # 设置堆    if H is None:                                                       # 如果堆为空        H = self._heap = [event_t(_when(e, e.is_due()[1]) or 0, 5, e)                           for e in values(self.schedule)]               # 将调用中的任务加载到堆中        heapify(H)                                                      # 将列表转换成优先级堆    if not H:                                                           # 如果为空        return max_interval                                             # 返回最大休眠时间    event = H[0]                                                        # 获取堆中第一个数据    entry = event[2]                                                    # 获取触发事件,此时是ModelEntry    is_due, next_time_to_run = self.is_due(entry)                       # 检查该ModelEntry是否可以运行,获取下一次运行的时间    if is_due:                                                          # 如果需要运行        verify = heappop(H)                                             # 弹出该任务        if verify is event:                                             # 检查该任务的类型            next_entry = self.reserve(entry)                            # 返回下一次该任务执行的entry            self.apply_entry(entry, producer=self.producer)             # 调用执行定时任务            heappush(H, event_t(_when(next_entry, next_time_to_run),                                event[1], next_entry))                  # 将该任务下一次执行的实例压入堆中            return 0                                                    # 返回0        else:            heappush(H, verify)                                         # 如果不是entry则将该任务压入堆中            return min(verify[0], max_interval)                         # 返回最小时间    return min(adjust(next_time_to_run) or max_interval, max_interval)  # 比较下次运行的时间和当前最大时间的大小,如果下次运行时间小于最大休眠时间则返回下次运行时间

此时,由于在DatabaseScheduler初始化的时候回调用父类Scheduler的构造方法,会调用setup_schedule方法,该方法在DatabaseScheduler中,

def setup_schedule(self):    self.install_default_entries(self.schedule)    self.update_from_dict(self.app.conf.beat_schedule)

此时,会将注册的定时任务,实例化为entry,此时对应的Entry就是ModelEntry,

def install_default_entries(self, data):    entries = {}    if self.app.conf.result_expires:        entries.setdefault(            'celery.backend_cleanup', {                'task': 'celery.backend_cleanup',                'schedule': schedules.crontab('0', '4', '*'),                'options': {'expires': 12 * 3600},            },        )    self.update_from_dict(entries) # 生成数据库数据

其中self.update_from_dict就是将配置的定时任务数据保存到数据库中,

def update_from_dict(self, mapping):    s = {}    for name, entry in items(mapping):        try:            s[name] = self.Entry.from_entry(name, app=self.app, **entry)        except Exception as exc:            logger.error(ADD_ENTRY_ERROR, name, exc, entry)    self.schedule.update(s)         # 调用schedule更新值

此时对应的schedule属性如下,

@propertydef schedule(self):    update = False    if not self._initial_read:        debug('DatabaseScheduler: initial read')        update = True        self._initial_read = True    elif self.schedule_changed():        info('DatabaseScheduler: Schedule changed.')        update = True    if update:        self.sync()        self._schedule = self.all_as_schedule()        if logger.isEnabledFor(logging.DEBUG):            debug('Current schedule:\n%s', '\n'.join(                repr(entry) for entry in values(self._schedule)),            )    return self._schedule

其中会将保存到数据库中的值,通过all_as_schedule方法读取出来,

def all_as_schedule(self):    debug('DatabaseScheduler: Fetching database schedule')    s = {}    for model in self.Model.objects.enabled():        try:            s[model.name] = self.Entry(model, app=self.app)        except ValueError:            pass    return s

查看数据库字段中,该任务是否能够运行,然后使用ModelEntry来初始化实例,并返回。此时在start函数中调用的self.scheduler.tick函数中调用的is_due函数就是调用的如下,

def is_due(self, entry):    return entry.is_due()  #是否可以运行

此时的entry就是ModelEntry的实例,调用了该类的is_due函数,

def is_due(self):    if not self.model.enabled:        return False, 5.0   # 5 second delay for re-enable.    检查该任务是否可以运行,如不能运行则直接返回休眠时间    return self.schedule.is_due(self.last_run_at)  # 调用schedule的is_due检查

此时就是调用了DatabaseScheduler的父类Scheduler的is_due方法,

def is_due(self, last_run_at):    """Return tuple of ``(is_due, next_time_to_check)``.    Notes:        - next time to check is in seconds.        - ``(True, 20)``, means the task should be run now, and the next            time to check is in 20 seconds.        - ``(False, 12.3)``, means the task is not due, but that the          scheduler should check again in 12.3 seconds.    The next time to check is used to save energy/CPU cycles,    it does not need to be accurate but will influence the precision    of your schedule.  You must also keep in mind    the value of :setting:`beat_max_loop_interval`,    that decides the maximum number of seconds the scheduler can    sleep between re-checking the periodic task intervals.  So if you    have a task that changes schedule at run-time then your next_run_at    check will decide how long it will take before a change to the    schedule takes effect.  The max loop interval takes precedence    over the next check at value returned.    .. admonition:: Scheduler max interval variance        The default max loop interval may vary for different schedulers.        For the default scheduler the value is 5 minutes, but for example        the :pypi:`django-celery-beat` database scheduler the value        is 5 seconds.    """    last_run_at = self.maybe_make_aware(last_run_at)    rem_delta = self.remaining_estimate(last_run_at)    remaining_s = max(rem_delta.total_seconds(), 0)    if remaining_s == 0:        return schedstate(is_due=True, next=self.seconds)    return schedstate(is_due=False, next=remaining_s)

计算出还剩余多少时间执行,如果剩余0,则表示当前能够执行,否则则返回剩余时间。

当需要执行时,在start函数中,当有任务需要执行的时候就会调用self.apply_entry(entry, producer=self.producer),

def apply_entry(self, entry, producer=None):    info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)    try:        result = self.apply_async(entry, producer=producer, advance=False)      # 发送异步任务    except Exception as exc:  # pylint: disable=broad-except        error('Message Error: %s\n%s',              exc, traceback.format_stack(), exc_info=True)    else:        debug('%s sent. id->%s', entry.task, result.id)                         # 打印出任务id

此时就调用self.apply_async,

def apply_async(self, entry, producer=None, advance=True, **kwargs):    # Update time-stamps and run counts before we actually execute,    # so we have that done if an exception is raised (doesn't schedule    # forever.)    entry = self.reserve(entry) if advance else entry       # 获取任务    task = self.app.tasks.get(entry.task)                   # 从app中获取对应的task    try:        if task:                                                        # 如果找到了任务            return task.apply_async(entry.args, entry.kwargs,                                    producer=producer,                                    **entry.options)                    # 调用任务的发送        else:            return self.send_task(entry.task, entry.args, entry.kwargs,                                  producer=producer,                                  **entry.options)                      # 通过app直接发送    except Exception as exc:  # pylint: disable=broad-except        reraise(SchedulingError, SchedulingError(            "Couldn't apply scheduled task {0.name}: {exc}".format(                entry, exc=exc)), sys.exc_info()[2])    finally:        self._tasks_since_sync += 1        if self.should_sync():            self._do_sync()

此时就将该任务发送出去,等待消费者消费,至此,定时任务的大概流程就完成。

本文总结

主要是讲述了定时任务的大概执行流程,默认情况下用数据库存储定时任务,然后调用检查是否到时间并通过将需要执行的任务,发送到消费端等待worker执行,此时就完成了定时任务的执行,其中很多细节没有一一分析,大家如有兴趣可自行分析。

转载地址:https://blog.csdn.net/qq_33339479/article/details/81019668 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:flask源码学习-helloworld与本地启动流程
下一篇:celery源码分析-Task的初始化与发送任务

发表评论

最新留言

关注你微信了!
[***.104.42.241]2024年04月24日 17时30分51秒