本文共 16938 字,大约阅读时间需要 56 分钟。
celery源码分析
本文环境python3.5.2,celery4.0.2,django1.10.x系列
celery的定时任务与Django配置
celery也可以执行定时任务来执行相关操作,celery与django的配置方法如下,
1.在celery_app.tasks中添加如下任务@shared_taskdef beat_task(): print("beat_task")
2.在celery_django.setting文件中添加如下配置,
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'celery_app', 'django_celery_beat']
这是因为celery与django配合的情况下使用了django_celery_beat第三方库,所以需要将该app注册进去。
3.在django.celery.py文件中添加如下配置,
from celery_django import settingsapp.autodiscover_tasks(lambda : settings.INSTALLED_APPS)from datetime import timedeltaCELERYBEAT_SCHEDULE = { 'test_beat': { 'task': 'celery_app.tasks.beat_task', 'schedule': timedelta(seconds=3), },}app.conf.update(CELERYBEAT_SCHEDULE=CELERYBEAT_SCHEDULE)
至此,配置文件就配置完成此时,先启动定时任务命令,
(venv) wuzideMacBook-Air:celery_django wuzi$ celery beat -A celery_django -S django
然后,启动worker服务区消费时间到了需要执行的任务,
celery -A celery_django worker
此时在worker的终端上就会输出,如下,
-------------- celery@wuzideMacBook-Air.local v4.0.0 (latentcall) ---- **** ----- --- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2018-07-12 07:24:56 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: celery_django:0x108934240 - ** ---------- .> transport: redis://127.0.0.1:6379/7 - ** ---------- .> results: redis://127.0.0.1:6379/6 - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [2018-07-12 07:24:57,512: WARNING/MainProcess] /Users/wuzi/python35/venv/lib/python3.5/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments! warnings.warn('Using settings.DEBUG leads to a memory leak, never ' [2018-07-12 07:24:57,546: WARNING/PoolWorker-3] beat_task [2018-07-12 07:24:57,550: WARNING/PoolWorker-4] beat_task [2018-07-12 07:24:57,551: WARNING/PoolWorker-2] beat_task [2018-07-12 07:24:57,560: WARNING/PoolWorker-1] beat_task
celery的定时任务分析
此时在位于celery_django目录下,在终端中输入如下命令,
celery beat -A celery_django -S django
根据前文的分析,此时进入的CeleryCommand中的commands中对应的beat对应的类,beat来执行,此时也是执行如下代码,
return cls( app=self.app, on_error=self.on_error, no_color=self.no_color, quiet=self.quiet, on_usage_error=partial(self.on_usage_error, command=command), ).run_from_argv(self.prog_name, argv[1:], command=argv[0]) # 实例化该类,实例化之后调用run_from_argv
此时cls对应的是beat类,但是通过查看位于bin/beat.py中的beat类可知,该类只重写了run方法和add_arguments方法,所以此时执行的run_from_argv方法是beat继承的Command的run_from_argv方法,该方法中会调用Command的handle_argv方法,而该方法在经过相关参数处理后会调用到call函数,在该函数中会调用到run方法,此时调用的run方法就是beat类中重写的run方法,查看该方法,
def run(self, detach=False, logfile=None, pidfile=None, uid=None, gid=None, umask=None, workdir=None, **kwargs): if not detach: # 是否是开启后台运行 maybe_drop_privileges(uid=uid, gid=gid) kwargs.pop('app', None) # 去除app传入参宿 beat = partial(self.app.Beat, logfile=logfile, pidfile=pidfile, **kwargs) # 设置偏函数,设置调用该类的默认参数 if detach: # 是否后台运行 with detached(logfile, pidfile, uid, gid, umask, workdir): return beat().run() # 后台运行 else: return beat().run() # 启动运行
此时调用了app的Beat属性,
@cached_propertydef Beat(self, **kwargs): """:program:`celery beat` scheduler application. See Also: :class:`~@Beat`. """ return self.subclass_with_self('celery.apps.beat:Beat') # 导入celery.apps.beat:Beat类
此时就实例化了celery.apps.beat中的Beat类,并调用了该实例的run方法,
def run(self): print(str(self.colored.cyan( 'celery beat v{0} is starting.'.format(VERSION_BANNER)))) self.init_loader() # 通过Loader加装 self.set_process_title() self.start_scheduler() # 开启任务调度
其中的init_loader方法继续查看,位于celery/loaders/base.py中BaseLoader类的init_worker方法,
def init_worker(self): if not self.worker_initialized: # 如果该类没有被设置过 self.worker_initialized = True # 设置成设置过 self.import_default_modules() # 导入默认的modules self.on_worker_init()
此时import_default_modules的函数如下,
def import_default_modules(self): signals.import_modules.send(sender=self.app) # 利用观察者默认通知导入apps return [ self.import_task_module(m) for m in ( tuple(self.builtin_modules) + tuple(maybe_list(self.app.conf.imports)) + tuple(maybe_list(self.app.conf.include)) ) # 导入项目中需要导入的modules ]
此时继续,分析 self.start_scheduler() 函数,该函数就是即将要启动的定时任务,
def start_scheduler(self): if self.pidfile: # 检查是否配置了pidfile platforms.create_pidlock(self.pidfile) # 创建pid文件 service = self.Service( app=self.app, max_interval=self.max_interval, scheduler_cls=self.scheduler_cls, schedule_filename=self.schedule, ) # 初始化service类 print(self.banner(service)) # 打印相关启动信息 self.setup_logging() # 开启日志 if self.socket_timeout: logger.debug('Setting default socket timeout to %r', self.socket_timeout) socket.setdefaulttimeout(self.socket_timeout) # 设置连接超时时间 try: self.install_sync_handler(service) # 注册handler service.start() # 开启 except Exception as exc: logger.critical('beat raised exception %s: %r', exc.__class__, exc, exc_info=True) raise
其中,Service类就是beat.Service,然后调用service的start方法,
def start(self, embedded_process=False): info('beat: Starting...') debug('beat: Ticking with max interval->%s', humanize_seconds(self.scheduler.max_interval)) # 打印最大间隔时间 signals.beat_init.send(sender=self) # 通知注册该signal的函数 if embedded_process: signals.beat_embedded_init.send(sender=self) platforms.set_process_title('celery beat') try: while not self._is_shutdown.is_set(): # 检查Event是否被设置 interval = self.scheduler.tick() # 调用scheduler.tick()函数检查还剩多余时间秀米昂 if interval and interval > 0.0: # 如果大于0 debug('beat: Waking up %s.', humanize_seconds(interval, prefix='in ')) time.sleep(interval) # 休眠 if self.scheduler.should_sync(): # 是否同步数据 self.scheduler._do_sync() # 同步数据 except (KeyboardInterrupt, SystemExit): self._is_shutdown.set() finally: self.sync()
由于在self.scheduler.max_interval时,调用了service的scheduler属性,
@cached_propertydef scheduler(self): return self.get_scheduler() # 获取scheduler
调用了service的get_scheduler函数,
def get_scheduler(self, lazy=False, extension_namespace='celery.beat_schedulers'): filename = self.schedule_filename # 此时默认的传入是celerybeat-schedule aliases = dict( load_extension_class_names(extension_namespace) or {}) # 导入celery.beat_schedulers对应的类 return symbol_by_name(self.scheduler_cls, aliases=aliases)( app=self.app, schedule_filename=filename, max_interval=self.max_interval, lazy=lazy, ) # 此时如果是默认的则是django_celery_beat.schedulers:DatabaseScheduler类
其中load_extension_class_names如下,
def load_extension_class_names(namespace): try: from pkg_resources import iter_entry_points except ImportError: # pragma: no cover return for ep in iter_entry_points(namespace): yield ep.name, ':'.join([ep.module_name, ep.attrs[0]])
该函数的功能是查找对应的celery.beat_schedulers对应的入口配置,该文件存在与site-packages/django_celery_beat-1.0.1.dist-info文件中,该文件内容如下,
[celery.beat_schedulers]django = django_celery_beat.schedulers:DatabaseScheduler
此时返回并导入的类就是django_celery_beat.schedulers:DatabaseScheduler该类。
当初始化完成后就执行到了self.scheduler.tick()函数,查看DatabaseScheduler的tick函数,
def tick(self, event_t=event_t, min=min, heappop=heapq.heappop, heappush=heapq.heappush, heapify=heapq.heapify, mktime=time.mktime): """Run a tick - one iteration of the scheduler. Executes one due task per call. Returns: float: preferred delay in seconds for next call. """ def _when(entry, next_time_to_run): return (mktime(entry.schedule.now().timetuple()) + (adjust(next_time_to_run) or 0)) adjust = self.adjust max_interval = self.max_interval # 最大的休眠时间 H = self._heap # 设置堆 if H is None: # 如果堆为空 H = self._heap = [event_t(_when(e, e.is_due()[1]) or 0, 5, e) for e in values(self.schedule)] # 将调用中的任务加载到堆中 heapify(H) # 将列表转换成优先级堆 if not H: # 如果为空 return max_interval # 返回最大休眠时间 event = H[0] # 获取堆中第一个数据 entry = event[2] # 获取触发事件,此时是ModelEntry is_due, next_time_to_run = self.is_due(entry) # 检查该ModelEntry是否可以运行,获取下一次运行的时间 if is_due: # 如果需要运行 verify = heappop(H) # 弹出该任务 if verify is event: # 检查该任务的类型 next_entry = self.reserve(entry) # 返回下一次该任务执行的entry self.apply_entry(entry, producer=self.producer) # 调用执行定时任务 heappush(H, event_t(_when(next_entry, next_time_to_run), event[1], next_entry)) # 将该任务下一次执行的实例压入堆中 return 0 # 返回0 else: heappush(H, verify) # 如果不是entry则将该任务压入堆中 return min(verify[0], max_interval) # 返回最小时间 return min(adjust(next_time_to_run) or max_interval, max_interval) # 比较下次运行的时间和当前最大时间的大小,如果下次运行时间小于最大休眠时间则返回下次运行时间
此时,由于在DatabaseScheduler初始化的时候回调用父类Scheduler的构造方法,会调用setup_schedule方法,该方法在DatabaseScheduler中,
def setup_schedule(self): self.install_default_entries(self.schedule) self.update_from_dict(self.app.conf.beat_schedule)
此时,会将注册的定时任务,实例化为entry,此时对应的Entry就是ModelEntry,
def install_default_entries(self, data): entries = {} if self.app.conf.result_expires: entries.setdefault( 'celery.backend_cleanup', { 'task': 'celery.backend_cleanup', 'schedule': schedules.crontab('0', '4', '*'), 'options': {'expires': 12 * 3600}, }, ) self.update_from_dict(entries) # 生成数据库数据
其中self.update_from_dict就是将配置的定时任务数据保存到数据库中,
def update_from_dict(self, mapping): s = {} for name, entry in items(mapping): try: s[name] = self.Entry.from_entry(name, app=self.app, **entry) except Exception as exc: logger.error(ADD_ENTRY_ERROR, name, exc, entry) self.schedule.update(s) # 调用schedule更新值
此时对应的schedule属性如下,
@propertydef schedule(self): update = False if not self._initial_read: debug('DatabaseScheduler: initial read') update = True self._initial_read = True elif self.schedule_changed(): info('DatabaseScheduler: Schedule changed.') update = True if update: self.sync() self._schedule = self.all_as_schedule() if logger.isEnabledFor(logging.DEBUG): debug('Current schedule:\n%s', '\n'.join( repr(entry) for entry in values(self._schedule)), ) return self._schedule
其中会将保存到数据库中的值,通过all_as_schedule方法读取出来,
def all_as_schedule(self): debug('DatabaseScheduler: Fetching database schedule') s = {} for model in self.Model.objects.enabled(): try: s[model.name] = self.Entry(model, app=self.app) except ValueError: pass return s
查看数据库字段中,该任务是否能够运行,然后使用ModelEntry来初始化实例,并返回。此时在start函数中调用的self.scheduler.tick函数中调用的is_due函数就是调用的如下,
def is_due(self, entry): return entry.is_due() #是否可以运行
此时的entry就是ModelEntry的实例,调用了该类的is_due函数,
def is_due(self): if not self.model.enabled: return False, 5.0 # 5 second delay for re-enable. 检查该任务是否可以运行,如不能运行则直接返回休眠时间 return self.schedule.is_due(self.last_run_at) # 调用schedule的is_due检查
此时就是调用了DatabaseScheduler的父类Scheduler的is_due方法,
def is_due(self, last_run_at): """Return tuple of ``(is_due, next_time_to_check)``. Notes: - next time to check is in seconds. - ``(True, 20)``, means the task should be run now, and the next time to check is in 20 seconds. - ``(False, 12.3)``, means the task is not due, but that the scheduler should check again in 12.3 seconds. The next time to check is used to save energy/CPU cycles, it does not need to be accurate but will influence the precision of your schedule. You must also keep in mind the value of :setting:`beat_max_loop_interval`, that decides the maximum number of seconds the scheduler can sleep between re-checking the periodic task intervals. So if you have a task that changes schedule at run-time then your next_run_at check will decide how long it will take before a change to the schedule takes effect. The max loop interval takes precedence over the next check at value returned. .. admonition:: Scheduler max interval variance The default max loop interval may vary for different schedulers. For the default scheduler the value is 5 minutes, but for example the :pypi:`django-celery-beat` database scheduler the value is 5 seconds. """ last_run_at = self.maybe_make_aware(last_run_at) rem_delta = self.remaining_estimate(last_run_at) remaining_s = max(rem_delta.total_seconds(), 0) if remaining_s == 0: return schedstate(is_due=True, next=self.seconds) return schedstate(is_due=False, next=remaining_s)
计算出还剩余多少时间执行,如果剩余0,则表示当前能够执行,否则则返回剩余时间。
当需要执行时,在start函数中,当有任务需要执行的时候就会调用self.apply_entry(entry, producer=self.producer),
def apply_entry(self, entry, producer=None): info('Scheduler: Sending due task %s (%s)', entry.name, entry.task) try: result = self.apply_async(entry, producer=producer, advance=False) # 发送异步任务 except Exception as exc: # pylint: disable=broad-except error('Message Error: %s\n%s', exc, traceback.format_stack(), exc_info=True) else: debug('%s sent. id->%s', entry.task, result.id) # 打印出任务id
此时就调用self.apply_async,
def apply_async(self, entry, producer=None, advance=True, **kwargs): # Update time-stamps and run counts before we actually execute, # so we have that done if an exception is raised (doesn't schedule # forever.) entry = self.reserve(entry) if advance else entry # 获取任务 task = self.app.tasks.get(entry.task) # 从app中获取对应的task try: if task: # 如果找到了任务 return task.apply_async(entry.args, entry.kwargs, producer=producer, **entry.options) # 调用任务的发送 else: return self.send_task(entry.task, entry.args, entry.kwargs, producer=producer, **entry.options) # 通过app直接发送 except Exception as exc: # pylint: disable=broad-except reraise(SchedulingError, SchedulingError( "Couldn't apply scheduled task {0.name}: {exc}".format( entry, exc=exc)), sys.exc_info()[2]) finally: self._tasks_since_sync += 1 if self.should_sync(): self._do_sync()
此时就将该任务发送出去,等待消费者消费,至此,定时任务的大概流程就完成。
本文总结
主要是讲述了定时任务的大概执行流程,默认情况下用数据库存储定时任务,然后调用检查是否到时间并通过将需要执行的任务,发送到消费端等待worker执行,此时就完成了定时任务的执行,其中很多细节没有一一分析,大家如有兴趣可自行分析。
转载地址:https://blog.csdn.net/qq_33339479/article/details/81019668 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!