本文共 22617 字,大约阅读时间需要 75 分钟。
celery源码分析
本文环境python3.5.2,celery4.0.2,django1.10.x系列
celery的worker启动
在上文中分析到了Hub类的初始化,接下来继续分析Pool类的初始化,
class Pool(bootsteps.StartStopStep): """Bootstep managing the worker pool. Describes how to initialize the worker pool, and starts and stops the pool during worker start-up/shutdown. Adds attributes: * autoscale * pool * max_concurrency * min_concurrency """ requires = (Hub,) ... def create(self, w): semaphore = None max_restarts = None if w.app.conf.worker_pool in GREEN_POOLS: # pragma: no cover 判断worker_pool是在'eventlet', 'gevent'中默认的是prefork warnings.warn(UserWarning(W_POOL_SETTING)) threaded = not w.use_eventloop or IS_WINDOWS # user_eventloop是否为True和是否是windows如果是则使用线程 procs = w.min_concurrency # 最小缓冲池个数,默认为4 w.process_task = w._process_task # 将worker的_process_task绑定到process_task if not threaded: # 不使用线程的话 semaphore = w.semaphore = LaxBoundedSemaphore(procs) # 通过LaxBoundedSemaphore实现原子操作,利用队列实现 w._quick_acquire = w.semaphore.acquire # 将相关的操作方法赋值给worker w._quick_release = w.semaphore.release max_restarts = 100 # 最大重启次数 if w.pool_putlocks and w.pool_cls.uses_semaphore: # 通过查看类配置是否更新process_task方法 w.process_task = w._process_task_sem # 默认配置更新process_task allow_restart = w.pool_restarts # 是否允许重启 pool = w.pool = self.instantiate( # w.pool_cls, w.min_concurrency, # w.pool_cls默认是prefork.TaskPool initargs=(w.app, w.hostname), maxtasksperchild=w.max_tasks_per_child, max_memory_per_child=w.max_memory_per_child, timeout=w.time_limit, soft_timeout=w.soft_time_limit, putlocks=w.pool_putlocks and threaded, lost_worker_timeout=w.worker_lost_wait, threads=threaded, max_restarts=max_restarts, allow_restart=allow_restart, forking_enable=True, semaphore=semaphore, sched_strategy=self.optimization, app=w.app, ) _set_task_join_will_block(pool.task_join_will_block) return pool
此时调用s.include方法会调用到Pool的create方法,该方法就是实力化了一个连接池。
Beat类分析,此时继续执行会执行到beat类,
class Beat(bootsteps.StartStopStep): """Step used to embed a beat process. Enabled when the ``beat`` argument is set. """ label = 'Beat' conditional = True def __init__(self, w, beat=False, **kwargs): self.enabled = w.beat = beat w.beat = None super(Beat, self).__init__(w, beat=beat, **kwargs) def create(self, w): from celery.beat import EmbeddedService if w.pool_cls.__module__.endswith(('gevent', 'eventlet')): # 检查pool_cls是否是'gevent', 'eventlet' raise ImproperlyConfigured(ERR_B_GREEN) # 如果是则不能使用定时任务 b = w.beat = EmbeddedService(w.app, schedule_filename=w.schedule_filename, scheduler_cls=w.scheduler) # 根据线程或者进程来开启定时任务 return b
此时beat类会根据传入的参数来确定是开启线程或者进程来执行定时任务。
Consumer类的加载过程如下,
class Consumer(bootsteps.StartStopStep): """Bootstep starting the Consumer blueprint.""" last = True def create(self, w): if w.max_concurrency: # 是否设置最大进程数 prefetch_count = max(w.min_concurrency, 1) * w.prefetch_multiplier # 最大进程数乘以CPU数 else: prefetch_count = w.concurrency * w.prefetch_multiplier # 获取最终设置运行进程数 c = w.consumer = self.instantiate( w.consumer_cls, w.process_task, # 初始化consumer类 hostname=w.hostname, task_events=w.task_events, init_callback=w.ready_callback, initial_prefetch_count=prefetch_count, pool=w.pool, timer=w.timer, app=w.app, controller=w, hub=w.hub, worker_options=w.options, disable_rate_limits=w.disable_rate_limits, prefetch_multiplier=w.prefetch_multiplier, ) return c
此时加载Consumer类,此时会加载位于celery/worker/consumer.py中的Consumer类,该类的加载过程如worker类似,
@python_2_unicode_compatibleclass Consumer(object): """Consumer blueprint.""" Strategies = dict #: Optional callback called the first time the worker #: is ready to receive tasks. init_callback = None #: The current worker pool instance. pool = None #: A timer used for high-priority internal tasks, such #: as sending heartbeats. timer = None restart_count = -1 # first start is the same as a restart class Blueprint(bootsteps.Blueprint): """Consumer blueprint.""" name = 'Consumer' default_steps = [ 'celery.worker.consumer.connection:Connection', 'celery.worker.consumer.mingle:Mingle', 'celery.worker.consumer.events:Events', 'celery.worker.consumer.gossip:Gossip', 'celery.worker.consumer.heart:Heart', 'celery.worker.consumer.control:Control', 'celery.worker.consumer.tasks:Tasks', 'celery.worker.consumer.consumer:Evloop', 'celery.worker.consumer.agent:Agent', ] def shutdown(self, parent): self.send_all(parent, 'shutdown') def __init__(self, on_task_request, init_callback=noop, hostname=None, pool=None, app=None, timer=None, controller=None, hub=None, amqheartbeat=None, worker_options=None, disable_rate_limits=False, initial_prefetch_count=2, prefetch_multiplier=1, **kwargs): self.app = app # 设置app self.controller = controller # 设置worker的类 self.init_callback = init_callback # 设置初始化完成的回调函数 self.hostname = hostname or gethostname() # 获取运行的hostname self.pid = os.getpid() # 获取进程的pid self.pool = pool # 设置连接池 self.timer = timer # 设置timer self.strategies = self.Strategies() self.conninfo = self.app.connection_for_read() # 连接信息 self.connection_errors = self.conninfo.connection_errors self.channel_errors = self.conninfo.channel_errors self._restart_state = restart_state(maxR=5, maxT=1) # 重置状态 self._does_info = logger.isEnabledFor(logging.INFO) self._limit_order = 0 self.on_task_request = on_task_request self.on_task_message = set() self.amqheartbeat_rate = self.app.conf.broker_heartbeat_checkrate # 心跳速率 self.disable_rate_limits = disable_rate_limits self.initial_prefetch_count = initial_prefetch_count self.prefetch_multiplier = prefetch_multiplier # this contains a tokenbucket for each task type by name, used for # rate limits, or None if rate limits are disabled for that task. self.task_buckets = defaultdict(lambda: None) self.reset_rate_limits() self.hub = hub # 设置hub if self.hub or getattr(self.pool, 'is_green', False): self.amqheartbeat = amqheartbeat if self.amqheartbeat is None: self.amqheartbeat = self.app.conf.broker_heartbeat else: self.amqheartbeat = 0 if not hasattr(self, 'loop'): # 设置loop self.loop = loops.asynloop if hub else loops.synloop if _detect_environment() == 'gevent': # 判断是否是gevent # there's a gevent bug that causes timeouts to not be reset, # so if the connection timeout is exceeded once, it can NEVER # connect again. self.app.conf.broker_connection_timeout = None self._pending_operations = [] self.steps = [] self.blueprint = self.Blueprint( steps=self.app.steps['consumer'], on_close=self.on_close, ) # 初始化blueprint类 self.blueprint.apply(self, **dict(worker_options or {}, **kwargs)) # 调用该类的apply方法
当该类初始化完成后,就会调用blueprint.apply方法,此时执行的操作就是加载blueprint类中的default_steps,在此就不一一分析该类的初始化和加载过程。
当所有的类初始化完成后,此时就是一个worker就初始化完成,此时初始化完成后就执行到了celery/bin/worker.py中的256行,
worker.start()
此时调用的方法就是celery/apps/worker.py中的,
def start(self): try: self.blueprint.start(self) # 此时调用blueprint的start方法 except WorkerTerminate: self.terminate() except Exception as exc: logger.critical('Unrecoverable error: %r', exc, exc_info=True) self.stop(exitcode=EX_FAILURE) except SystemExit as exc: self.stop(exitcode=exc.code) except KeyboardInterrupt: self.stop(exitcode=EX_FAILURE)
此时查看blueprint的start方法,
def start(self, parent): self.state = RUN # 设置当前运行状态 if self.on_start: # 如果初始化是传入了该方法就执行该方法 self.on_start() for i, step in enumerate(s for s in parent.steps if s is not None): # 依次遍历step并调用step的start方法 self._debug('Starting %s', step.alias) self.started = i + 1 step.start(parent) logger.debug('^-- substep ok')
此时,parent.steps就是在step.include中添加到该数组中,parent.steps目前值为[Hub,Pool,Beat,Consumer],此时调用了worker的on_start方法,
def on_start(self): app = self.app # 设置app WorkController.on_start(self) # 调用父类的on_start # this signal can be used to, for example, change queues after # the -Q option has been applied. signals.celeryd_after_setup.send( sender=self.hostname, instance=self, conf=app.conf, ) if self.purge: self.purge_messages() if not self.quiet: self.emit_banner() # 打印启动信息 self.set_process_status('-active-') self.install_platform_tweaks(self) # 注册相应的信号处理handler if not self._custom_logging and self.redirect_stdouts: app.log.redirect_stdouts(self.redirect_stdouts_level)
父类的on_start就是创建pid文件,
def on_start(self): if self.pidfile: self.pidlock = create_pidlock(self.pidfile) # 创建pid文件
其中注册相关的信号处理handler的函数如下,
def install_platform_tweaks(self, worker): """Install platform specific tweaks and workarounds.""" if self.app.IS_macOS: self.macOS_proxy_detection_workaround() # Install signal handler so SIGHUP restarts the worker. if not self._isatty: # only install HUP handler if detached from terminal, # so closing the terminal window doesn't restart the worker # into the background. if self.app.IS_macOS: # macOS can't exec from a process using threads. # See https://github.com/celery/celery/issues#issue/152 install_HUP_not_supported_handler(worker) else: install_worker_restart_handler(worker) # 注册重启的信号 SIGHUP install_worker_term_handler(worker) install_worker_term_hard_handler(worker) install_worker_int_handler(worker) install_cry_handler() # SIGUSR1 信号处理函数 install_rdb_handler() # SIGUSR2 信号处理函数
就单独分析一下重启的函数,
def _reload_current_worker():
platforms.close_open_fds([ sys.stdin, sys.stdout, sys.stderr, ]) # 关闭已经打开的文件描述符 os.execv(sys.executable, [sys.executable] + sys.argv) # 重新加载该程序def install_worker_restart_handler(worker, sig=’SIGHUP’):
def restart_worker_sig_handler(*args): """Signal handler restarting the current python program.""" set_in_sighandler(True) safe_say('Restarting celery worker ({0})'.format(' '.join(sys.argv))) import atexit atexit.register(_reload_current_worker) # 注册程序退出时执行的函数 from celery.worker import state state.should_stop = EX_OK # 设置状态platforms.signals[sig] = restart_worker_sig_handler
其中的platforms.signals类设置了setitem方法,
def __setitem__(self, name, handler): """Install signal handler. Does nothing if the current platform has no support for signals, or the specified signal in particular. """ try: _signal.signal(self.signum(name), handler) except (AttributeError, ValueError): pass
此时就将相应的handler设置到了运行程序中,_signal就是导入的signal库。
此时就继续执行Blueprint中的start方法中,依次遍历parent.steps的方法中,依次调用step的start方法,依次遍历Hub,Pool,Beat,Consumer,
由于Hub重写了start方法,该方法什么都不执行,
def start(self, w): pass
继续调用Pool方法,此时会调用到StartStopStep,此时的obj就是调用create方法返回的对象,此时obj为pool实例,
def start(self, parent): if self.obj: return self.obj.start()
此时就继续调用pool实例的start方法,调用BasePool,
def start(self): self._does_debug = logger.isEnabledFor(logging.DEBUG) self.on_start() self._state = self.RUN
调用的就是父类prefork的on_start,
def on_start(self): forking_enable(self.forking_enable) Pool = (self.BlockingPool if self.options.get('threads', True) else self.Pool) # 是否是阻塞,如果传入的选项有threads则使用阻塞 P = self._pool = Pool(processes=self.limit, initializer=process_initializer, on_process_exit=process_destructor, enable_timeouts=True, synack=False, **self.options) # AsynPool实例化 # Create proxy methods self.on_apply = P.apply_async # 将pool中的方法设置到Pool类上 self.maintain_pool = P.maintain_pool self.terminate_job = P.terminate_job self.grow = P.grow self.shrink = P.shrink self.flush = getattr(P, 'flush', None) # FIXME add to billiard
继续调用Beat方法,Beat同上相同由于默认使用的是进程启动,所有此时的start方法就是调用启动进程,
def start(self): ''' Start child process ''' assert self._popen is None, 'cannot start a process twice' assert self._parent_pid == os.getpid(), \ 'can only start a process object created by current process' _cleanup() self._popen = self._Popen(self) self._sentinel = self._popen.sentinel _children.add(self)
继续调用Consumer的start方法,
def start(self): blueprint = self.blueprint while blueprint.state != CLOSE: # 判断当前状态是否是关闭 maybe_shutdown() # 通过标志判断是否应该关闭 if self.restart_count: # 如果设置了重启次数 try: self._restart_state.step() # 重置 except RestartFreqExceeded as exc: crit('Frequent restarts detected: %r', exc, exc_info=1) sleep(1) self.restart_count += 1 # 次数加1 try: blueprint.start(self) # 调用开始方法 except self.connection_errors as exc: # If we're not retrying connections, no need to catch # connection errors if not self.app.conf.broker_connection_retry: raise if isinstance(exc, OSError) and exc.errno == errno.EMFILE: raise # Too many open files maybe_shutdown() if blueprint.state != CLOSE: # 如果状态不是关闭状态 if self.connection: self.on_connection_error_after_connected(exc) else: self.on_connection_error_before_connected(exc) self.on_close() blueprint.restart(self) # 调用重启方法
此时又进入到了blueprint的start方法,此时该blueprint的steps值是由Consumer在初始化的时候传入的,所有依次遍历step,然后start的方法,是根据该传入的steps来执行的,传入的steps是Agent,Connection,Evloop,Control,Events,Gossip,Heart,Mingle,Tasks类的实例,然后根据调用最后添加到parent.steps中的实例就是[Connection,Events,Heart,Mingle,Tasks,Control,Gossip,Evloop],此时就依次调用这些实例的start方法,
首先分析仪一下Connection的start方法,
def start(self, c): c.connection = c.connect() info('Connected to %s', c.connection.as_uri())
就是调用了consumer的connect()函数,
def connect(self): """Establish the broker connection. Retries establishing the connection if the :setting:`broker_connection_retry` setting is enabled """ conn = self.app.connection_for_read(heartbeat=self.amqheartbeat) # 心跳 # Callback called for each retry while the connection # can't be established. def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP): if getattr(conn, 'alt', None) and interval == 0: next_step = CONNECTION_FAILOVER error(CONNECTION_ERROR, conn.as_uri(), exc, next_step.format(when=humanize_seconds(interval, 'in', ' '))) # remember that the connection is lazy, it won't establish # until needed. if not self.app.conf.broker_connection_retry: # 如果retry禁止 # retry disabled, just call connect directly. conn.connect() # 直接连接 return conn # 返回conn conn = conn.ensure_connection( _error_handler, self.app.conf.broker_connection_max_retries, callback=maybe_shutdown, ) # 确保连接上 if self.hub: conn.transport.register_with_event_loop(conn.connection, self.hub) # 使用异步调用 return conn # 返回conn
此时就建立了连接。
继续分析Task的start方法,
def start(self, c): """Start task consumer.""" c.update_strategies() # 更新已知的任务 # - RabbitMQ 3.3 completely redefines how basic_qos works.. # This will detect if the new qos smenatics is in effect, # and if so make sure the 'apply_global' flag is set on qos updates. qos_global = not c.connection.qos_semantics_matches_spec # set initial prefetch count c.connection.default_channel.basic_qos( 0, c.initial_prefetch_count, qos_global, ) # 设置计数 c.task_consumer = c.app.amqp.TaskConsumer( c.connection, on_decode_error=c.on_decode_error, ) # 开始消费 def set_prefetch_count(prefetch_count): return c.task_consumer.qos( prefetch_count=prefetch_count, apply_global=qos_global, ) c.qos = QoS(set_prefetch_count, c.initial_prefetch_count) # 设置计数
此时就开启了对应的任务消费,开启消费后我们继续分析一下loop的开启,
def start(self, c): self.patch_all(c) c.loop(*c.loop_args())
此时就是调用了consumer中的loop函数,该loop函数就是位于celery/worker/loops.py中的asyncloop函数,
def asynloop(obj, connection, consumer, blueprint, hub, qos, heartbeat, clock, hbrate=2.0): """Non-blocking event loop.""" # 其中obj就是consumer实例 RUN = bootsteps.RUN # 获取运行状态 update_qos = qos.update errors = connection.connection_errors on_task_received = obj.create_task_handler() # 创建任务处理头 _enable_amqheartbeats(hub.timer, connection, rate=hbrate) # 定时发送心跳包 consumer.on_message = on_task_received # 设置消费的on_message为on_task_received consumer.consume() # 开始消费 obj.on_ready() # 调用回调函数 obj.controller.register_with_event_loop(hub) # 向所有生成的blueprint中的实例注册hub obj.register_with_event_loop(hub) # did_start_ok will verify that pool processes were able to start, # but this will only work the first time we start, as # maxtasksperchild will mess up metrics. if not obj.restart_count and not obj.pool.did_start_ok(): raise WorkerLostError('Could not start worker processes') # consumer.consume() may have prefetched up to our # limit - drain an event so we're in a clean state # prior to starting our event loop. if connection.transport.driver_type == 'amqp': hub.call_soon(_quick_drain, connection) # FIXME: Use loop.run_forever # Tried and works, but no time to test properly before release. hub.propagate_errors = errors loop = hub.create_loop() # 创建loop,本质是一个生成器 try: while blueprint.state == RUN and obj.connection: # 检查是否在运行并且连接是否有 # shutdown if signal handlers told us to. should_stop, should_terminate = ( state.should_stop, state.should_terminate, ) # False == EX_OK, so must use is not False if should_stop is not None and should_stop is not False: raise WorkerShutdown(should_stop) elif should_terminate is not None and should_stop is not False: raise WorkerTerminate(should_terminate) # We only update QoS when there's no more messages to read. # This groups together qos calls, and makes sure that remote # control commands will be prioritized over task messages. if qos.prev != qos.value: update_qos() try: next(loop) # 循环下一个 except StopIteration: loop = hub.create_loop() finally: try: hub.reset() except Exception as exc: # pylint: disable=broad-except logger.exception( 'Error cleaning up after event loop: %r', exc)
至此,异步Loop就开启了,然后就开始了服务端的事件等待处理,至此,worker流程就启动完毕。
本文总结
本文主要讲述了继续分析了worker的启动流程,celery默认启动的就是已进程的方式启动定时任务,然后异步IO处理消费端的事件,这其中也设计到celery对worker任务之间的一些任务消费的情况,但是没有做一一分析,至此worker的启动流程概述已分析完毕。
转载地址:https://blog.csdn.net/qq_33339479/article/details/80958059 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!