celery源码分析:multi命令分析
发布日期:2021-07-25 13:04:37 浏览次数:8 分类:技术文章

本文共 26693 字,大约阅读时间需要 88 分钟。

celery源码分析

本文环境python3.5.2,celery4.0.2,django1.10.x系列

celery简介

celery是一款异步任务框架,基于AMQP协议的任务调度框架。使用的场景与生产者消费者类似,生产者发送消息,发送到消息队列中,然后消费者通过消息队列获取消息然后消费掉,这样达到服务或应用(生产者)解耦。

使用场景概述

celery作为异步任务框架,不仅能完成任务的分发调度,也可以实现定时任务,同时也是一个分布式的任务调度框架。其应用场景,当在用户注册时,通常需要发送一封激活邮件来激活账户,而发邮件是一个IO开销较大的操作,此时会增加用户的等待时间;当web应用需要执行大量的文件读写操作时,为了避免提供服务的本机资源被这些文件操作占用过多资源,可以将这些任务发送到专门的服务器上进行处理;这些场景都可以利用异步任务框架进行,达到解耦提升服务响应。

celery命令简析

根据官方文档来看,选用redis作为消息队列,当安装好celery时,在终端输入celery时,如下:

(venv) wuzideMacBook-Air:~ wuzi$ celeryusage: celery  [options] Show help screen and exit.positional arguments:  argsoptional arguments:  -h, --help            show this help message and exit  --version             show program's version number and exitGlobal Options:  -A APP, --app APP  -b BROKER, --broker BROKER  --loader LOADER  --config CONFIG  --workdir WORKDIR  --no-color, -C  --quiet, -q---- -- - - ---- Commands- -------------- --- ------------+ Main: |    celery worker|    celery events|    celery beat|    celery shell|    celery multi|    celery amqp+ Remote Control: |    celery status|    celery inspect --help|    celery inspect active |    celery inspect active_queues |    celery inspect clock |    celery inspect conf [include_defaults=False]|    celery inspect memdump [n_samples=10]|    celery inspect memsample |    celery inspect objgraph [object_type=Request] [num=200 [max_depth=10]]|    celery inspect ping |    celery inspect query_task [id1 [id2 [... [idN]]]]|    celery inspect registered [attr1 [attr2 [... [attrN]]]]|    celery inspect report |    celery inspect reserved |    celery inspect revoked |    celery inspect scheduled |    celery inspect stats |    celery control --help|    celery control add_consumer 
[exchange [type [routing_key]]]| celery control autoscale [max [min]]| celery control cancel_consumer
| celery control disable_events | celery control election | celery control enable_events | celery control heartbeat | celery control pool_grow [N=1]| celery control pool_restart | celery control pool_shrink [N=1]| celery control rate_limit
| celery control revoke [id1 [id2 [... [idN]]]]| celery control shutdown | celery control terminate
[id1 [id2 [... [idN]]]]| celery control time_limit
[hard_secs]+ Utils: | celery purge| celery list| celery call| celery result| celery migrate| celery graph| celery upgrade+ Debugging: | celery report| celery logtool---- -- - - --------- -- - -------------- --- ------------Type 'celery
--help' for help using a specific command.

根据以上输出可以看出,celery的主要命令分为worker,events,beat,shell,multi和amqp主命令组成。今天主要分析multi命令,该命令是celery提供管理celery集群的命令。

celery后台管理命令multi

此时在终端中输入:

(venv) wuzideMacBook-Air:~ wuzi$ celery multicelery multi v4.0.0 (latentcall)usage: multi start 
[worker options] multi stop
[-SIG (default: -TERM)] multi restart
[-SIG] [worker options] multi kill
multi show
[worker options] multi get hostname
[-qv] [worker options] multi names
multi expand template
multi helpadditional options (must appear after command name): * --nosplash: Don't display program info. * --quiet: Don't show as much output. * --verbose: Show more output. * --no-color: Don't display colors.

通过终端显示可知,multi的一些基本操作命令有start、stop、restart和kill等,此时celery配合Django框架使用,所有启动的话按照官方文档与django的相关配置使用。

celery的启动过程

由于本文环境默认是django框架配合使用,在终端中输入:

celery multi start t1 -A wangqian t2 -A wangqian

此时-A表示是使用app,wangqian就是django项目的跟项目位置,我们开始源码分析,找到celery的入口函数。

def main():    """Entrypoint to the ``celery`` umbrella command."""    if 'multi' not in sys.argv:                 # 判断multi是否在输入参数里面,作为后台启动管理命令        maybe_patch_concurrency()               # 如果不在则判断是否在参数中使用gevent或者eventlet作为pool    from celery.bin.celery import main as _main    _main()                                     # 入口执行函数

此时继续查看celery.bin.celery中的main函数;

def main(argv=None):    """Start celery umbrella command."""    # Fix for setuptools generated scripts, so that it will    # work with multiprocessing fork emulation.    # (see multiprocessing.forking.get_preparation_data())    try:        if __name__ != '__main__':  # pragma: no cover            sys.modules['__main__'] = sys.modules[__name__]        cmd = CeleryCommand()                       # 初始化CeleryCommand类实例        cmd.maybe_patch_concurrency()               # 检查是否更换pool        from billiard import freeze_support        freeze_support()        cmd.execute_from_commandline(argv)          # 执行    except KeyboardInterrupt:        pass

除了检查pool参数外,进而执行了CeleryCommand实例的execute_from_commandline方法,继续查看:

class CeleryCommand(Command):        """Base class for commands."""        commands = {            'amqp': amqp,            'beat': beat,            'call': call,            'control': control,            'events': events,            'graph': graph,            'help': help,            'inspect': inspect,            'list': list_,            'logtool': logtool,            'migrate': migrate,            'multi': multi,            'purge': purge,            'report': report,            'result': result,            'shell': shell,            'status': status,            'upgrade': upgrade,            'worker': worker,        }        ext_fmt = '{self.namespace}.commands'        enable_config_from_cmdline = True        prog_name = 'celery'        namespace = 'celery'    def execute_from_commandline(self, argv=None):        argv = sys.argv if argv is None else argv               # 判断是否有传入输入参数,如果没有则使用系统的输入参数        if 'multi' in argv[1:3]:  # Issue 1008            self.respects_app_option = False        try:            sys.exit(determine_exit_status(                super(CeleryCommand, self).execute_from_commandline(argv)))  # 调用执行方法        except KeyboardInterrupt:            sys.exit(EX_FAILURE)

由该类可以看出CeleryCommand继承自基础的Command类,此时调用的方法是调用父类的execute_from_commandline方法;

def execute_from_commandline(self, argv=None):        """Execute application from command-line.        Arguments:            argv (List[str]): The list of command-line arguments.                Defaults to ``sys.argv``.        """        if argv is None:            argv = list(sys.argv)                               # 如果没有输入参数则使用脚本的传入参数        # Should we load any special concurrency environment?        self.maybe_patch_concurrency(argv)                      self.on_concurrency_setup()                                     # Dump version and exit if '--version' arg set.        self.early_version(argv)                               # 获取版本号        argv = self.setup_app_from_commandline(argv)           # 将系统参数进行处理,获取相关参数信息        self.prog_name = os.path.basename(argv[0])             # 获取运行的类的名称        return self.handle_argv(self.prog_name, argv[1:])      # 进行处理

此时完成会调用setup_app_from_commandline方法,该方法主要是解析输入参数值,并准备好运行的参数

def setup_app_from_commandline(self, argv):    preload_options = self.parse_preload_options(argv)              # 获取参数如broker workdir等参数    quiet = preload_options.get('quiet')    if quiet is not None:        self.quiet = quiet    try:        self.no_color = preload_options['no_color']                 # 设置输出字体颜色    except KeyError:        pass    workdir = preload_options.get('workdir')                        # 获取配置的工作目录    if workdir:        os.chdir(workdir)                                           # 如果设置了工作目录则切换到该目录    app = (preload_options.get('app') or                            # 由于此时preload_options中app为输入参数wangqian           os.environ.get('CELERY_APP') or           self.app)                                                # 获取app值    preload_loader = preload_options.get('loader')                  # 获取加载类    if preload_loader:                                              # 如果参数中设置有则在环境变量中设置该加载类        # Default app takes loader from this env (Issue #1066).        os.environ['CELERY_LOADER'] = preload_loader    loader = (preload_loader,              os.environ.get('CELERY_LOADER') or              'default')                                                broker = preload_options.get('broker', None)                    #  获取broker    if broker:        os.environ['CELERY_BROKER_URL'] = broker                    # 如果获取到了则设置到环境变量中    config = preload_options.get('config')                          # 获取配置    if config:        os.environ['CELERY_CONFIG_MODULE'] = config                 # 获取到了则直接设置    if self.respects_app_option:                                    # 由于在multi命令中此时该值被设置为了False,如果设置成True        if app:                                                     # 如果参数中传入了app值            self.app = self.find_app(app)                           # 加载并获取该app实例        elif self.app is None:                                                  self.app = self.get_app(loader=loader)        if self.enable_config_from_cmdline:                         # 是否是解析命令行参数            argv = self.process_cmdline_config(argv)                # 如果是则解析命令行参数    else:        self.app = Celery(fixups=[])                                # 直接实例化Celery    self._handle_user_preload_options(argv)                             return argv

此时就将解析完成的参数返回,此时返回到execute_from_commandline函数处继续执行self.handle_argv(self.prog_name, argv[1:]) 函数,由于该函数在CeleryCommand中重写,所以会调用CeleryCommand类中的handle_argv函数,

def handle_argv(self, prog_name, argv, **kwargs):    self.prog_name = self.prepare_prog_name(prog_name)      # 获取运行的名称    argv = self._relocate_args_from_start(argv)             # 对参数顺序进行调整    _, argv = self.prepare_args(None, argv)                 # 检查参数是否合法    try:        command = argv[0]                                   # 获取需要执行的命令    except IndexError:        command, argv = 'help', ['help']    return self.execute(command, argv)                      # 调用该函数执行该命令

此时执行到这里,command就是输入参数的multi值,此时继续执行self.execute函数

def execute(self, command, argv=None):    try:        cls = self.commands[command]                        # 通过输入参数的command获取对应的处理类    except KeyError:        cls, argv = self.commands['help'], ['help']         # 如果获取不到则设置成help    cls = self.commands.get(command) or self.commands['help']   # 如果没有获取不到则默认使用help    try:        return cls(            app=self.app, on_error=self.on_error,            no_color=self.no_color, quiet=self.quiet,            on_usage_error=partial(self.on_usage_error, command=command),        ).run_from_argv(self.prog_name, argv[1:], command=argv[0])      # 实例化该类,实例化之后调用run_from_argv    except self.UsageError as exc:        self.on_usage_error(exc)        return exc.status    except self.Error as exc:        self.on_error(exc)        return exc.status

此时就调用了位于celery/bin/celery.py中的worker类,实例化并执行run_from_argv函数。

celery命令之multi

multi的类如下:

class multi(Command):    """Start multiple worker instances."""    respects_app_option = False    def run_from_argv(self, prog_name, argv, command=None):        from celery.bin.multi import MultiTool        cmd = MultiTool(quiet=self.quiet, no_color=self.no_color)   # 实例化MultiTool        return cmd.execute_from_commandline([command] + argv)       # 调用实例的execute_from_commandline

此时继续查看MultiTool类的execute_from_commandline方法:

def execute_from_commandline(self, argv, cmd=None):    # Reserve the --nosplash|--quiet|-q/--verbose options.    argv = self._handle_reserved_options(argv)              # 将argv中设置reserved_options中的值更新到该类属性上    self.cmd = cmd if cmd is not None else self.cmd         # 获取cmd值    self.prog_name = os.path.basename(argv.pop(0))              if not self.validate_arguments(argv):                   # 检查输入参数是否正确        return self.error()    return self.call_command(argv[0], argv[1:])             # 调用命令执行命令def validate_arguments(self, argv):    return argv and argv[0][0] != '-'def call_command(self, command, argv):    try:        return self.commands[command](*argv) or EX_OK      # 执行该命令对应的方法    except KeyError:        return self.error('Invalid command: {0}'.format(command))

此时对应的command就是start,此时就调用MultiTool类中的commands中start对应的方法self.start,

@splash@using_clusterdef start(self, cluster):    self.note('> Starting nodes...')    return int(any(cluster.start()))

使用了Python的装饰器,查看splash和using_cluster的两个方法。

def splash(fun):    @wraps(fun)    def _inner(self, *args, **kwargs):        self.splash()        return fun(self, *args, **kwargs)    return _inner

该方法仅是调用了MultiTool类的splash方法,打印相关颜色等文案信息。

def using_cluster(fun):    @wraps(fun)    def _inner(self, *argv, **kwargs):        return fun(self, self.cluster_from_argv(argv), **kwargs)    return _inner

在调用fun即最终的start方法时,将self.cluster_from_argv(argv)作为了参数传入了start方法中,MultiTool中相关方法如下,

def _nodes_from_argv(self, argv, cmd=None):        cmd = cmd if cmd is not None else self.cmd              # 获取相关命令        p = self.OptionParser(argv)                             # 解析输出参数        p.parse()                           return p, self.MultiParser(cmd=cmd).parse(p)            # 解析并生成node实例    def cluster_from_argv(self, argv, cmd=None):        _, cluster = self._cluster_from_argv(argv, cmd=cmd)     # 解析相关参数        return cluster    def _cluster_from_argv(self, argv, cmd=None):        p, nodes = self._nodes_from_argv(argv, cmd=cmd)         # 通过参数分解成相应nodes        return p, self.Cluster(list(nodes), cmd=cmd)            # 返回初始化的Cluster实例

此时由self.MultiParser(cmd=cmd).parse(p)生成具体的node实例,位于MultiParser中的parse函数,

def parse(self, p):     .....    return (        self._node_from_options(            p, name, prefix, suffix, cmd, append, options)        for name in names    ) # 调用_node_from_options生成node实例

其中,_node_from_options函数如下,

def _node_from_options(self, p, name, prefix,                       suffix, cmd, append, options):    namespace, nodename, _ = build_nodename(name, prefix, suffix)    namespace = nodename if nodename in p.namespaces else namespace    return Node(nodename, cmd, append,                p.optmerge(namespace, options), p.passthrough)      # 根据输入参数实例化Node实例

此时继续执行start函数时,传入参数cluster就是刚刚将输入参数初始化对应node的cluster,

def start(self, cluster):    self.note('> Starting nodes...')    return int(any(cluster.start()))

此时调用start方法就是调用cluster的start方法,

class Cluster(UserList):    """Represent a cluster of workers."""    ....    def start(self):        return [self.start_node(node) for node in self]     # 依次遍历nodes,然后依次启动node节点    def start_node(self, node):        maybe_call(self.on_node_start, node)                # 如果on_node_start能执行则调用        retcode = self._start_node(node)                    # 开始节点        maybe_call(self.on_node_status, node, retcode)        return retcode                                      # 返回返回值    def _start_node(self, node):        return node.start(            self.env,            on_spawn=self.on_child_spawn,            on_signalled=self.on_child_signalled,            on_failure=self.on_child_failure,        )                                                   # 调用node实例的start方法    @property    def data(self):        return self.nodes

此时就调用了start_node的方法依次运行node的start方法,

def start(self, env=None, **kwargs):    return self._waitexec(        self.argv, path=self.executable, env=env, **kwargs)             # 调用_waitexec启动子进程def _waitexec(self, argv, path=sys.executable, env=None,              on_spawn=None, on_signalled=None, on_failure=None):    argstr = self.prepare_argv(argv, path)                              # 获取启动传入参数    maybe_call(on_spawn, self, argstr=' '.join(argstr), env=env)    pipe = Popen(argstr, env=env)                                       # 已子进程方式打开    return self.handle_process_exit(        pipe.wait(),                                                    # 等待子进程结束放回code        on_signalled=on_signalled,        on_failure=on_failure,    )def handle_process_exit(self, retcode, on_signalled=None, on_failure=None):    if retcode < 0:                                                     # 如果子进程退出码小于0        maybe_call(on_signalled, self, -retcode)        return -retcode    elif retcode > 0:                                                           maybe_call(on_failure, self, retcode)                           # 如果大于0,如果传入on_failure函数则调用    return retcode

此时的重点在于argstr参数中添加了-m -detach参数,此时在子进程中执行时,启动的原理如上所示,但是此时的输入参数就是worker –detach -A wangqian -n t1@wuzi –pidfile=t1.pid –logfile=t1%I.log 等。

worker的后台运行

此时启动的流程如上述分析相同,只不过此时此时的CeleryCommand中获取的command就是worker,此时此处代码CeleryCommand.execute中的cls就是worker,

return cls(            app=self.app, on_error=self.on_error,            no_color=self.no_color, quiet=self.quiet,            on_usage_error=partial(self.on_usage_error, command=command),        ).run_from_argv(self.prog_name, argv[1:], command=argv[0])      # 实例化该类,实例化之后调用run_from_argv

查看worker对应的处理流程,

def run_from_argv(self, prog_name, argv=None, command=None):    argv = [x for x in argv if x not in self.removed_flags]             # 判断输入参数是否是需要删除的参数    command = sys.argv[0] if command is None else command               # 获取执行任务的command    argv = sys.argv[1:] if argv is None else argv                       # 获取输入参数    # parse options before detaching so errors can be handled.    options, args = self.prepare_args(        *self.parse_options(prog_name, argv, command))                  # 获取解析之后的参数值    self.maybe_detach([command] + argv)                                 # 判断是否是后台运行    return self(*args, **options)                                       # 调用__call__方法def maybe_detach(self, argv, dopts=['-D', '--detach']):    if any(arg in argv for arg in dopts):                               # 如果输入参数中有-D或者--detach        argv = [v for v in argv if v not in dopts]                      # 移除-D 或者 --detach        # will never return        detached_celeryd(self.app).execute_from_commandline(argv)       # 后台运行该任务        raise SystemExit(0)                                             # 抛出退出码0

此时我们继续分析detached_celeryd该类,

def execute_from_commandline(self, argv=None):    argv = sys.argv if argv is None else argv    prog_name = os.path.basename(argv[0])    config, argv = self._split_command_line_config(argv)    options, leftovers = self.parse_options(prog_name, argv[1:])    sys.exit(detach(        app=self.app, path=self.execv_path,        argv=self.execv_argv + leftovers + config,        **vars(options)    ))

在经过相关参数的检查调用后,就调用detach方法,

def detach(path, argv, logfile=None, pidfile=None, uid=None,           gid=None, umask=None, workdir=None, fake=False, app=None,           executable=None, hostname=None):    """Detach program by argv'."""    hostname = default_nodename(hostname)                   # 获取hostname    logfile = node_format(logfile, hostname)                # 序列化logfile    pidfile = node_format(pidfile, hostname)                # 序列化pidfile    fake = 1 if C_FAKEFORK else fake    with detached(logfile, pidfile, uid, gid, umask, workdir, fake,                  after_forkers=False):        try:            if executable is not None:                path = executable            os.execv(path, [path] + argv)                   # 调用execv重新加载程序        except Exception:  # pylint: disable=broad-except            if app is None:                from celery import current_app                app = current_app            app.log.setup_logging_subsystem(                'ERROR', logfile, hostname=hostname)            logger.critical("Can't exec %r", ' '.join([path] + argv),                            exc_info=True)        return EX_FAILURE

继续查看detached函数,该函数的返回值实现了Python的上下文协议,即实现了enterexit方法,

def detached(logfile=None, pidfile=None, uid=None, gid=None, umask=0,             workdir=None, fake=False, **opts):    """Detach the current process in the background (daemonize).       if not resource:        raise RuntimeError('This platform does not support detach.')    workdir = os.getcwd() if workdir is None else workdir                   # 获取当前的工作目录    signals.reset('SIGCLD')  # Make sure SIGCLD is using the default handler.    maybe_drop_privileges(uid=uid, gid=gid)    def after_chdir_do():                                                   # 检查logfile能否打开并创建pidfile        # Since without stderr any errors will be silently suppressed,        # we need to know that we have access to the logfile.        logfile and open(logfile, 'a').close()        # Doesn't actually create the pidfile, but makes sure it's not stale.        if pidfile:            _create_pidlock(pidfile).release()    return DaemonContext(        umask=umask, workdir=workdir, fake=fake, after_chdir=after_chdir_do,    )

此时查看DaemonContext类实现了上下文协议并且实现了fork让最后的子进程去重新加载执行该程序,并且完成退出。

class DaemonContext(object):    """Context manager daemonizing the process."""    _is_open = False    def __init__(self, pidfile=None, workdir=None, umask=None,                 fake=False, after_chdir=None, after_forkers=True,                 **kwargs):        if isinstance(umask, string_t):            # octal or decimal, depending on initial zero.            umask = int(umask, 8 if umask.startswith('0') else 10)        self.workdir = workdir or DAEMON_WORKDIR        self.umask = umask        self.fake = fake        self.after_chdir = after_chdir        self.after_forkers = after_forkers        self.stdfds = (sys.stdin, sys.stdout, sys.stderr)    def redirect_to_null(self, fd):        if fd is not None:            dest = os.open(os.devnull, os.O_RDWR)            os.dup2(dest, fd)    def open(self):        if not self._is_open:            if not self.fake:                self._detach()              # 实现后台进程            os.chdir(self.workdir)            if self.umask is not None:                os.umask(self.umask)            if self.after_chdir:                self.after_chdir()            if not self.fake:                # We need to keep /dev/urandom from closing because                # shelve needs it, and Beat needs shelve to start.                keep = list(self.stdfds) + fd_by_path(['/dev/urandom'])                close_open_fds(keep)                for fd in self.stdfds:                    self.redirect_to_null(maybe_fileno(fd))                if self.after_forkers and mputil is not None:                    mputil._run_after_forkers()            self._is_open = True    __enter__ = open    def close(self, *args):        if self._is_open:            self._is_open = False    __exit__ = close    def _detach(self):        if os.fork() == 0:      # first child            os.setsid()         # create new session            if os.fork() > 0:   # pragma: no cover                # second child                os._exit(0)        else:            os._exit(0)        return self

此时子进程os.execve中执行的可执行程序就是本程序,去掉-D,–detach参数后重新执行的进程,此时在node中的pipe.wait()的返回状态码为0,此时就执行下一个node,至此完成所有node的启动,至此,所有的子节点的启动就完成了。

multi停止命令stop

后台节点启动之后,如果想要停止该node节点的话,只需要在终端上输入,

celery multi stop  t1 -A wangqian

此时就调用MultiTool类中的commands中stop对应的方法self.stop,

@splash@using_cluster_and_sigdef stop(self, cluster, sig, **kwargs):    return cluster.stop(sig=sig, **kwargs)

此时与上文分析同理也是调用了splash和using_cluster_and_sig作为装饰器,此时我们看下using_cluster_and_sig,

def using_cluster_and_sig(fun):    @wraps(fun)    def _inner(self, *argv, **kwargs):        p, cluster = self._cluster_from_argv(argv)      # 生成cluster        sig = self._find_sig_argument(p)                # 查看输入参数中是否传入了信号值        return fun(self, cluster, sig, **kwargs)    return _inner

此时就调用了cluster的stop方法,此时查看该stop方法的作用,

def stop(self, retry=None, callback=None, sig=signal.SIGTERM):    return self._stop_nodes(retry=retry, on_down=callback, sig=sig)         # 调用该方法停止node节点运行def stopwait(self, retry=2, callback=None, sig=signal.SIGTERM):    return self._stop_nodes(retry=retry, on_down=callback, sig=sig)def _stop_nodes(self, retry=None, on_down=None, sig=signal.SIGTERM):    on_down = on_down if on_down is not None else self.on_node_down         # 判断是否有传入on_node_down对应的回调函数    nodes = list(self.getpids(on_down=on_down))                             # 获取所有有pid的node    if nodes:        for node in self.shutdown_nodes(nodes, sig=sig, retry=retry):       # 调用该函数停止node运行            maybe_call(on_down, node)def shutdown_nodes(self, nodes, sig=signal.SIGTERM, retry=None):    P = set(nodes)                                                          # 去重    maybe_call(self.on_stopping_preamble, nodes)                            # 如果注册则执行该函数    to_remove = set()                                                       # 需要移除的Node节点    for node in P:        maybe_call(self.on_send_signal, node, signal_name(sig))             # 如果有则执行该函数        if not node.send(sig, self.on_node_signal_dead):                    # 调用node的send方法发送信号,并传入on_node_signal_dead回调函数            to_remove.add(node)                                             # 停止成功则添加到已删除列表中            yield node                                                      # 返回已停止的node    P -= to_remove                                                          # P中减去已经停止的node    if retry:                                                               # 如果传入值        maybe_call(self.on_still_waiting_for, P)          its = 0        while P:            to_remove = set()                                               # 设置已移除列表            for node in P:                                                  # 遍历P                its += 1                                                                    maybe_call(self.on_still_waiting_progress, P)                if not node.alive():                                        # 通过向该进程发送信号0,检查该进程是否还活着,如果已经死亡                    maybe_call(self.on_node_shutdown_ok, node)                          to_remove.add(node)                                     # 则添加到移除列表中                    yield node                                              # 返回该node                    maybe_call(self.on_still_waiting_for, P)                    break            P -= to_remove                                                  # 从P中减去已经移除的node            if P and not its % len(P):                                      # 如果P不为空,并且its的计数值对P列表的求摸值为0                sleep(float(retry))                                         # sleep retry秒        maybe_call(self.on_still_waiting_end)

从以上代码可以看出,停止node节点是通过调用node.send方法发送相关操作信号给node去执行,

def send(self, sig, on_error=None):    pid = self.pid                                  # 获取node的pid    if pid:                                         # 如果有        try:            os.kill(pid, sig)                       # 系统调用去向该进程发送信号sig        except OSError as exc:            if exc.errno != errno.ESRCH:                raise            maybe_call(on_error, self)            return False                            # 失败返回false        return True                                 # 成功返回true    maybe_call(on_error, self)

至此,stop命令就完成了停止运行相关的node节点。

相关总结multi

celery的multi的相关命令,如start、stop等基本命令的解析,目前已基本完成,multi的指令的大致的实现思路是,通过生成一个Cluster来管理生成的子节点,通过生成后台进程的worker进程并生成worker进程的pid文件记录工作进程的pid,当想要停止摸个后台worker进程时,通过pid文件获取相关进程的pid并向该进程发送相关信号,达到控制管理的功能。

转载地址:https://blog.csdn.net/qq_33339479/article/details/80949216 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:celery源码分析-wroker初始化分析(上)
下一篇:操作系统学习:Linux0.12文件异步IO

发表评论

最新留言

哈哈,博客排版真的漂亮呢~
[***.90.31.176]2024年03月31日 18时01分57秒