celery源码分析-worker初始化分析(下)
发布日期:2021-07-25 13:04:38 浏览次数:9 分类:技术文章

本文共 22617 字,大约阅读时间需要 75 分钟。

celery源码分析

本文环境python3.5.2,celery4.0.2,django1.10.x系列

celery的worker启动

在上文中分析到了Hub类的初始化,接下来继续分析Pool类的初始化,

class Pool(bootsteps.StartStopStep):    """Bootstep managing the worker pool.    Describes how to initialize the worker pool, and starts and stops    the pool during worker start-up/shutdown.    Adds attributes:        * autoscale        * pool        * max_concurrency        * min_concurrency    """    requires = (Hub,)    ...    def create(self, w):        semaphore = None        max_restarts = None        if w.app.conf.worker_pool in GREEN_POOLS:  # pragma: no cover  判断worker_pool是在'eventlet', 'gevent'中默认的是prefork            warnings.warn(UserWarning(W_POOL_SETTING))        threaded = not w.use_eventloop or IS_WINDOWS            # user_eventloop是否为True和是否是windows如果是则使用线程        procs = w.min_concurrency                               # 最小缓冲池个数,默认为4        w.process_task = w._process_task                        # 将worker的_process_task绑定到process_task        if not threaded:                                        # 不使用线程的话            semaphore = w.semaphore = LaxBoundedSemaphore(procs)  # 通过LaxBoundedSemaphore实现原子操作,利用队列实现            w._quick_acquire = w.semaphore.acquire                # 将相关的操作方法赋值给worker            w._quick_release = w.semaphore.release                            max_restarts = 100                                    # 最大重启次数            if w.pool_putlocks and w.pool_cls.uses_semaphore:     # 通过查看类配置是否更新process_task方法                w.process_task = w._process_task_sem              # 默认配置更新process_task        allow_restart = w.pool_restarts                           # 是否允许重启        pool = w.pool = self.instantiate(                         #             w.pool_cls, w.min_concurrency,                        # w.pool_cls默认是prefork.TaskPool            initargs=(w.app, w.hostname),            maxtasksperchild=w.max_tasks_per_child,            max_memory_per_child=w.max_memory_per_child,            timeout=w.time_limit,            soft_timeout=w.soft_time_limit,            putlocks=w.pool_putlocks and threaded,            lost_worker_timeout=w.worker_lost_wait,            threads=threaded,            max_restarts=max_restarts,            allow_restart=allow_restart,            forking_enable=True,            semaphore=semaphore,            sched_strategy=self.optimization,            app=w.app,        )        _set_task_join_will_block(pool.task_join_will_block)        return pool

此时调用s.include方法会调用到Pool的create方法,该方法就是实力化了一个连接池。

Beat类分析,此时继续执行会执行到beat类,

class Beat(bootsteps.StartStopStep):    """Step used to embed a beat process.    Enabled when the ``beat`` argument is set.    """    label = 'Beat'    conditional = True    def __init__(self, w, beat=False, **kwargs):        self.enabled = w.beat = beat        w.beat = None        super(Beat, self).__init__(w, beat=beat, **kwargs)    def create(self, w):        from celery.beat import EmbeddedService        if w.pool_cls.__module__.endswith(('gevent', 'eventlet')):                 # 检查pool_cls是否是'gevent', 'eventlet'            raise ImproperlyConfigured(ERR_B_GREEN)                                # 如果是则不能使用定时任务        b = w.beat = EmbeddedService(w.app,                                     schedule_filename=w.schedule_filename,                                     scheduler_cls=w.scheduler)                    # 根据线程或者进程来开启定时任务        return b

此时beat类会根据传入的参数来确定是开启线程或者进程来执行定时任务。

Consumer类的加载过程如下,

class Consumer(bootsteps.StartStopStep):    """Bootstep starting the Consumer blueprint."""    last = True    def create(self, w):        if w.max_concurrency:                                                   # 是否设置最大进程数            prefetch_count = max(w.min_concurrency, 1) * w.prefetch_multiplier  # 最大进程数乘以CPU数        else:            prefetch_count = w.concurrency * w.prefetch_multiplier              # 获取最终设置运行进程数        c = w.consumer = self.instantiate(            w.consumer_cls, w.process_task,                                     # 初始化consumer类            hostname=w.hostname,            task_events=w.task_events,            init_callback=w.ready_callback,            initial_prefetch_count=prefetch_count,            pool=w.pool,            timer=w.timer,            app=w.app,            controller=w,            hub=w.hub,            worker_options=w.options,            disable_rate_limits=w.disable_rate_limits,            prefetch_multiplier=w.prefetch_multiplier,        )        return c

此时加载Consumer类,此时会加载位于celery/worker/consumer.py中的Consumer类,该类的加载过程如worker类似,

@python_2_unicode_compatibleclass Consumer(object):    """Consumer blueprint."""    Strategies = dict    #: Optional callback called the first time the worker    #: is ready to receive tasks.    init_callback = None    #: The current worker pool instance.    pool = None    #: A timer used for high-priority internal tasks, such    #: as sending heartbeats.    timer = None    restart_count = -1  # first start is the same as a restart    class Blueprint(bootsteps.Blueprint):        """Consumer blueprint."""        name = 'Consumer'        default_steps = [            'celery.worker.consumer.connection:Connection',            'celery.worker.consumer.mingle:Mingle',            'celery.worker.consumer.events:Events',            'celery.worker.consumer.gossip:Gossip',            'celery.worker.consumer.heart:Heart',            'celery.worker.consumer.control:Control',            'celery.worker.consumer.tasks:Tasks',            'celery.worker.consumer.consumer:Evloop',            'celery.worker.consumer.agent:Agent',        ]        def shutdown(self, parent):            self.send_all(parent, 'shutdown')    def __init__(self, on_task_request,                 init_callback=noop, hostname=None,                 pool=None, app=None,                 timer=None, controller=None, hub=None, amqheartbeat=None,                 worker_options=None, disable_rate_limits=False,                 initial_prefetch_count=2, prefetch_multiplier=1, **kwargs):        self.app = app                                                      # 设置app        self.controller = controller                                        # 设置worker的类        self.init_callback = init_callback                                  # 设置初始化完成的回调函数        self.hostname = hostname or gethostname()                           # 获取运行的hostname        self.pid = os.getpid()                                              # 获取进程的pid        self.pool = pool                                                    # 设置连接池        self.timer = timer                                                  # 设置timer        self.strategies = self.Strategies()                                             self.conninfo = self.app.connection_for_read()                      # 连接信息        self.connection_errors = self.conninfo.connection_errors        self.channel_errors = self.conninfo.channel_errors        self._restart_state = restart_state(maxR=5, maxT=1)                 # 重置状态        self._does_info = logger.isEnabledFor(logging.INFO)        self._limit_order = 0        self.on_task_request = on_task_request        self.on_task_message = set()        self.amqheartbeat_rate = self.app.conf.broker_heartbeat_checkrate   # 心跳速率        self.disable_rate_limits = disable_rate_limits                              self.initial_prefetch_count = initial_prefetch_count        self.prefetch_multiplier = prefetch_multiplier        # this contains a tokenbucket for each task type by name, used for        # rate limits, or None if rate limits are disabled for that task.        self.task_buckets = defaultdict(lambda: None)        self.reset_rate_limits()                                                        self.hub = hub                                                      # 设置hub        if self.hub or getattr(self.pool, 'is_green', False):                    self.amqheartbeat = amqheartbeat            if self.amqheartbeat is None:                self.amqheartbeat = self.app.conf.broker_heartbeat        else:            self.amqheartbeat = 0        if not hasattr(self, 'loop'):                                       # 设置loop            self.loop = loops.asynloop if hub else loops.synloop        if _detect_environment() == 'gevent':                               # 判断是否是gevent            # there's a gevent bug that causes timeouts to not be reset,            # so if the connection timeout is exceeded once, it can NEVER            # connect again.            self.app.conf.broker_connection_timeout = None        self._pending_operations = []        self.steps = []        self.blueprint = self.Blueprint(            steps=self.app.steps['consumer'],            on_close=self.on_close,        )                                                                   # 初始化blueprint类        self.blueprint.apply(self, **dict(worker_options or {}, **kwargs))  # 调用该类的apply方法

当该类初始化完成后,就会调用blueprint.apply方法,此时执行的操作就是加载blueprint类中的default_steps,在此就不一一分析该类的初始化和加载过程。

当所有的类初始化完成后,此时就是一个worker就初始化完成,此时初始化完成后就执行到了celery/bin/worker.py中的256行,

worker.start()

此时调用的方法就是celery/apps/worker.py中的,

def start(self):    try:        self.blueprint.start(self)                                  # 此时调用blueprint的start方法    except WorkerTerminate:        self.terminate()    except Exception as exc:        logger.critical('Unrecoverable error: %r', exc, exc_info=True)        self.stop(exitcode=EX_FAILURE)    except SystemExit as exc:        self.stop(exitcode=exc.code)    except KeyboardInterrupt:        self.stop(exitcode=EX_FAILURE)

此时查看blueprint的start方法,

def start(self, parent):    self.state = RUN                                                        # 设置当前运行状态                              if self.on_start:                                                       # 如果初始化是传入了该方法就执行该方法        self.on_start()    for i, step in enumerate(s for s in parent.steps if s is not None):     # 依次遍历step并调用step的start方法        self._debug('Starting %s', step.alias)        self.started = i + 1        step.start(parent)        logger.debug('^-- substep ok')

此时,parent.steps就是在step.include中添加到该数组中,parent.steps目前值为[Hub,Pool,Beat,Consumer],此时调用了worker的on_start方法,

def on_start(self):        app = self.app                                                  # 设置app        WorkController.on_start(self)                                   # 调用父类的on_start        # this signal can be used to, for example, change queues after        # the -Q option has been applied.        signals.celeryd_after_setup.send(            sender=self.hostname, instance=self, conf=app.conf,        )        if self.purge:            self.purge_messages()                           if not self.quiet:            self.emit_banner()                                          # 打印启动信息        self.set_process_status('-active-')        self.install_platform_tweaks(self)                              # 注册相应的信号处理handler        if not self._custom_logging and self.redirect_stdouts:            app.log.redirect_stdouts(self.redirect_stdouts_level)

父类的on_start就是创建pid文件,

def on_start(self):    if self.pidfile:        self.pidlock = create_pidlock(self.pidfile)    # 创建pid文件

其中注册相关的信号处理handler的函数如下,

def install_platform_tweaks(self, worker):    """Install platform specific tweaks and workarounds."""    if self.app.IS_macOS:        self.macOS_proxy_detection_workaround()    # Install signal handler so SIGHUP restarts the worker.    if not self._isatty:        # only install HUP handler if detached from terminal,        # so closing the terminal window doesn't restart the worker        # into the background.        if self.app.IS_macOS:            # macOS can't exec from a process using threads.            # See https://github.com/celery/celery/issues#issue/152            install_HUP_not_supported_handler(worker)        else:            install_worker_restart_handler(worker)                      # 注册重启的信号 SIGHUP    install_worker_term_handler(worker)                 install_worker_term_hard_handler(worker)    install_worker_int_handler(worker)                                      install_cry_handler()                                               # SIGUSR1 信号处理函数    install_rdb_handler()                                               # SIGUSR2 信号处理函数

就单独分析一下重启的函数,

def _reload_current_worker():

platforms.close_open_fds([
sys.stdin, sys.stdout, sys.stderr,
]) # 关闭已经打开的文件描述符
os.execv(sys.executable, [sys.executable] + sys.argv) # 重新加载该程序

def install_worker_restart_handler(worker, sig=’SIGHUP’):

def restart_worker_sig_handler(*args):    """Signal handler restarting the current python program."""    set_in_sighandler(True)    safe_say('Restarting celery worker ({0})'.format(' '.join(sys.argv)))    import atexit    atexit.register(_reload_current_worker)                 # 注册程序退出时执行的函数    from celery.worker import state    state.should_stop = EX_OK                               # 设置状态platforms.signals[sig] = restart_worker_sig_handler

其中的platforms.signals类设置了setitem方法,

def __setitem__(self, name, handler):    """Install signal handler.    Does nothing if the current platform has no support for signals,    or the specified signal in particular.    """    try:        _signal.signal(self.signum(name), handler)    except (AttributeError, ValueError):        pass

此时就将相应的handler设置到了运行程序中,_signal就是导入的signal库。

此时就继续执行Blueprint中的start方法中,依次遍历parent.steps的方法中,依次调用step的start方法,依次遍历Hub,Pool,Beat,Consumer,

由于Hub重写了start方法,该方法什么都不执行,

def start(self, w):        pass

继续调用Pool方法,此时会调用到StartStopStep,此时的obj就是调用create方法返回的对象,此时obj为pool实例,

def start(self, parent):    if self.obj:        return self.obj.start()

此时就继续调用pool实例的start方法,调用BasePool,

def start(self):    self._does_debug = logger.isEnabledFor(logging.DEBUG)    self.on_start()    self._state = self.RUN

调用的就是父类prefork的on_start,

def on_start(self):    forking_enable(self.forking_enable)    Pool = (self.BlockingPool if self.options.get('threads', True)            else self.Pool)                                         # 是否是阻塞,如果传入的选项有threads则使用阻塞    P = self._pool = Pool(processes=self.limit,                          initializer=process_initializer,                          on_process_exit=process_destructor,                          enable_timeouts=True,                          synack=False,                          **self.options)                           # AsynPool实例化    # Create proxy methods    self.on_apply = P.apply_async                                   # 将pool中的方法设置到Pool类上    self.maintain_pool = P.maintain_pool    self.terminate_job = P.terminate_job    self.grow = P.grow    self.shrink = P.shrink    self.flush = getattr(P, 'flush', None)  # FIXME add to billiard

继续调用Beat方法,Beat同上相同由于默认使用的是进程启动,所有此时的start方法就是调用启动进程,

def start(self):    '''    Start child process    '''    assert self._popen is None, 'cannot start a process twice'    assert self._parent_pid == os.getpid(), \        'can only start a process object created by current process'    _cleanup()    self._popen = self._Popen(self)    self._sentinel = self._popen.sentinel    _children.add(self)

继续调用Consumer的start方法,

def start(self):    blueprint = self.blueprint    while blueprint.state != CLOSE:                                         # 判断当前状态是否是关闭        maybe_shutdown()                                                    # 通过标志判断是否应该关闭        if self.restart_count:                                              # 如果设置了重启次数            try:                self._restart_state.step()                                  # 重置            except RestartFreqExceeded as exc:                crit('Frequent restarts detected: %r', exc, exc_info=1)                sleep(1)        self.restart_count += 1                                             # 次数加1        try:             blueprint.start(self)                                           # 调用开始方法        except self.connection_errors as exc:            # If we're not retrying connections, no need to catch            # connection errors            if not self.app.conf.broker_connection_retry:                raise            if isinstance(exc, OSError) and exc.errno == errno.EMFILE:                raise  # Too many open files            maybe_shutdown()            if blueprint.state != CLOSE:                                    # 如果状态不是关闭状态                if self.connection:                    self.on_connection_error_after_connected(exc)                else:                    self.on_connection_error_before_connected(exc)                self.on_close()                blueprint.restart(self)                                     # 调用重启方法

此时又进入到了blueprint的start方法,此时该blueprint的steps值是由Consumer在初始化的时候传入的,所有依次遍历step,然后start的方法,是根据该传入的steps来执行的,传入的steps是Agent,Connection,Evloop,Control,Events,Gossip,Heart,Mingle,Tasks类的实例,然后根据调用最后添加到parent.steps中的实例就是[Connection,Events,Heart,Mingle,Tasks,Control,Gossip,Evloop],此时就依次调用这些实例的start方法,

首先分析仪一下Connection的start方法,

def start(self, c):    c.connection = c.connect()    info('Connected to %s', c.connection.as_uri())

就是调用了consumer的connect()函数,

def connect(self):    """Establish the broker connection.    Retries establishing the connection if the    :setting:`broker_connection_retry` setting is enabled    """    conn = self.app.connection_for_read(heartbeat=self.amqheartbeat)        # 心跳    # Callback called for each retry while the connection    # can't be established.    def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):        if getattr(conn, 'alt', None) and interval == 0:            next_step = CONNECTION_FAILOVER        error(CONNECTION_ERROR, conn.as_uri(), exc,              next_step.format(when=humanize_seconds(interval, 'in', ' ')))    # remember that the connection is lazy, it won't establish    # until needed.    if not self.app.conf.broker_connection_retry:                           # 如果retry禁止        # retry disabled, just call connect directly.        conn.connect()                                                      # 直接连接        return conn                                                         # 返回conn    conn = conn.ensure_connection(        _error_handler, self.app.conf.broker_connection_max_retries,        callback=maybe_shutdown,    )                                                                       # 确保连接上    if self.hub:        conn.transport.register_with_event_loop(conn.connection, self.hub)  # 使用异步调用    return conn                                                             # 返回conn

此时就建立了连接。

继续分析Task的start方法,

def start(self, c):    """Start task consumer."""    c.update_strategies()                                           # 更新已知的任务    # - RabbitMQ 3.3 completely redefines how basic_qos works..    # This will detect if the new qos smenatics is in effect,    # and if so make sure the 'apply_global' flag is set on qos updates.    qos_global = not c.connection.qos_semantics_matches_spec    # set initial prefetch count    c.connection.default_channel.basic_qos(        0, c.initial_prefetch_count, qos_global,    )                                                               # 设置计数    c.task_consumer = c.app.amqp.TaskConsumer(        c.connection, on_decode_error=c.on_decode_error,    )                                                               # 开始消费    def set_prefetch_count(prefetch_count):        return c.task_consumer.qos(            prefetch_count=prefetch_count,            apply_global=qos_global,        )    c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)       # 设置计数

此时就开启了对应的任务消费,开启消费后我们继续分析一下loop的开启,

def start(self, c):    self.patch_all(c)    c.loop(*c.loop_args())

此时就是调用了consumer中的loop函数,该loop函数就是位于celery/worker/loops.py中的asyncloop函数,

def asynloop(obj, connection, consumer, blueprint, hub, qos,         heartbeat, clock, hbrate=2.0):    """Non-blocking event loop."""                                  # 其中obj就是consumer实例    RUN = bootsteps.RUN                                             # 获取运行状态    update_qos = qos.update    errors = connection.connection_errors    on_task_received = obj.create_task_handler()                    # 创建任务处理头    _enable_amqheartbeats(hub.timer, connection, rate=hbrate)       # 定时发送心跳包    consumer.on_message = on_task_received                          # 设置消费的on_message为on_task_received    consumer.consume()                                              # 开始消费    obj.on_ready()                                                  # 调用回调函数    obj.controller.register_with_event_loop(hub)                    # 向所有生成的blueprint中的实例注册hub    obj.register_with_event_loop(hub)                                   # did_start_ok will verify that pool processes were able to start,    # but this will only work the first time we start, as    # maxtasksperchild will mess up metrics.    if not obj.restart_count and not obj.pool.did_start_ok():        raise WorkerLostError('Could not start worker processes')    # consumer.consume() may have prefetched up to our    # limit - drain an event so we're in a clean state    # prior to starting our event loop.    if connection.transport.driver_type == 'amqp':        hub.call_soon(_quick_drain, connection)    # FIXME: Use loop.run_forever    # Tried and works, but no time to test properly before release.    hub.propagate_errors = errors    loop = hub.create_loop()                                        # 创建loop,本质是一个生成器    try:        while blueprint.state == RUN and obj.connection:            # 检查是否在运行并且连接是否有            # shutdown if signal handlers told us to.            should_stop, should_terminate = (                state.should_stop, state.should_terminate,            )            # False == EX_OK, so must use is not False            if should_stop is not None and should_stop is not False:                raise WorkerShutdown(should_stop)            elif should_terminate is not None and should_stop is not False:                raise WorkerTerminate(should_terminate)            # We only update QoS when there's no more messages to read.            # This groups together qos calls, and makes sure that remote            # control commands will be prioritized over task messages.            if qos.prev != qos.value:                update_qos()                                                    try:                next(loop)                                          # 循环下一个            except StopIteration:                loop = hub.create_loop()    finally:        try:            hub.reset()        except Exception as exc:  # pylint: disable=broad-except            logger.exception(                'Error cleaning up after event loop: %r', exc)

至此,异步Loop就开启了,然后就开始了服务端的事件等待处理,至此,worker流程就启动完毕。

本文总结

本文主要讲述了继续分析了worker的启动流程,celery默认启动的就是已进程的方式启动定时任务,然后异步IO处理消费端的事件,这其中也设计到celery对worker任务之间的一些任务消费的情况,但是没有做一一分析,至此worker的启动流程概述已分析完毕。

转载地址:https://blog.csdn.net/qq_33339479/article/details/80958059 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:celery源码分析-Task的初始化与发送任务
下一篇:celery源码分析-wroker初始化分析(上)

发表评论

最新留言

关注你微信了!
[***.104.42.241]2024年03月28日 17时33分03秒