Python源码学习:多线程实现机制
发布日期:2021-07-25 13:04:30 浏览次数:8 分类:技术文章

本文共 28493 字,大约阅读时间需要 94 分钟。

Python源码分析

本文环境python2.5系列参考书籍<
>

本文分析Python中的多线程机制,主要通过一个多线程的脚本来分析多线程的基本操作与实现。

分析

本次分析的脚本如下;

import threadimport timedef f():    while True:        print("test test")        time.sleep(1)thread.start_new_thread(f, ())time.sleep(10)print("main over")

thread主要是用c实现的库,本次分析也主要分析该方法对应的执行流程。

脚本文件的字节码如下;

1           0 LOAD_CONST               0 (-1)              3 LOAD_CONST               1 (None)              6 IMPORT_NAME              0 (thread)              9 STORE_NAME               0 (thread)  2          12 LOAD_CONST               0 (-1)             15 LOAD_CONST               1 (None)             18 IMPORT_NAME              1 (time)             21 STORE_NAME               1 (time)  4          24 LOAD_CONST               2 ()             27 MAKE_FUNCTION            0             30 STORE_NAME               2 (f)  9          33 LOAD_NAME                0 (thread)             36 LOAD_ATTR                3 (start_new_thread)             39 LOAD_NAME                2 (f)             42 LOAD_CONST               5 (())             45 CALL_FUNCTION            2             48 POP_TOP              10          49 LOAD_NAME                1 (time)             52 LOAD_ATTR                4 (sleep)             55 LOAD_CONST               3 (10)             58 CALL_FUNCTION            1             61 POP_TOP              11          62 LOAD_CONST               4 ('main over')             65 PRINT_ITEM                       66 PRINT_NEWLINE                    67 LOAD_CONST               1 (None)             70 RETURN_VALUE

由于thread是使用c编写的库,首先先看下该库是怎样导入的,怎样将该库初始化的,在这里就简要分析一下import_name的执行流程;

case IMPORT_NAME:            w = GETITEM(names, oparg);                          // 获取常量列表中,获取对应的字符串            x = PyDict_GetItemString(f->f_builtins, "__import__");     // 获取在Python启动过程中就初始化的__import__方法            if (x == NULL) {                PyErr_SetString(PyExc_ImportError,                        "__import__ not found");                break;            }            v = POP();            u = TOP();            if (PyInt_AsLong(u) != -1 || PyErr_Occurred())     // 包装传入的参数                w = PyTuple_Pack(5,                        w,                        f->f_globals,                        f->f_locals == NULL ?                          Py_None : f->f_locals,                        v,                        u);            else                w = PyTuple_Pack(4,                        w,                        f->f_globals,                        f->f_locals == NULL ?                          Py_None : f->f_locals,                        v);            Py_DECREF(v);            Py_DECREF(u);            if (w == NULL) {                u = POP();                x = NULL;                break;            }            READ_TIMESTAMP(intr0);            x = PyEval_CallObject(x, w);    // 调用查找方法,将获取的方法和打包的参数传入            READ_TIMESTAMP(intr1);            Py_DECREF(w);            SET_TOP(x);            if (x != NULL) continue;            break;

此时,调用PyEval_CallObject函数将查找方法传入并传入参数;在Python启动流程中,剖析过会初始化内建方法,最后会调用Py_InitModule4方法,而该方法其中会将定义的方法进行初始化;

PyObject *Py_InitModule4(const char *name, PyMethodDef *methods, const char *doc,           PyObject *passthrough, int module_api_version){    ...    d = PyModule_GetDict(m);       if (methods != NULL) {        n = PyString_FromString(name);        if (n == NULL)            return NULL;        for (ml = methods; ml->ml_name != NULL; ml++) {            if ((ml->ml_flags & METH_CLASS) ||                (ml->ml_flags & METH_STATIC)) {                PyErr_SetString(PyExc_ValueError,                        "module functions cannot set"                        " METH_CLASS or METH_STATIC");                Py_DECREF(n);                return NULL;            }            v = PyCFunction_NewEx(ml, passthrough, n);  // 生成一个PyCFunction方法            if (v == NULL) {                Py_DECREF(n);                return NULL;            }            if (PyDict_SetItemString(d, ml->ml_name, v) != 0) {  //将生成的PyCFunction设置到对应的mod的字典中                Py_DECREF(v);                Py_DECREF(n);                return NULL;            }            Py_DECREF(v);        }        Py_DECREF(n);    }    ...}

由此可见获取的import是一个PyCFuntion对象,然后继续分析;执行函数PyEval_CallObjectWithKeywords

PyObject *PyEval_CallObjectWithKeywords(PyObject *func, PyObject *arg, PyObject *kw){    PyObject *result;    if (arg == NULL) {        arg = PyTuple_New(0);        if (arg == NULL)            return NULL;    }    else if (!PyTuple_Check(arg)) {        PyErr_SetString(PyExc_TypeError,                "argument list must be a tuple");        return NULL;    }    else        Py_INCREF(arg);    if (kw != NULL && !PyDict_Check(kw)) {        PyErr_SetString(PyExc_TypeError,                "keyword list must be a dictionary");        Py_DECREF(arg);        return NULL;    }    result = PyObject_Call(func, arg, kw);    Py_DECREF(arg);    return result;}

在进行相应的参数检查后,然后调用了PyObject_Call;

PyObject *PyObject_Call(PyObject *func, PyObject *arg, PyObject *kw){        ternaryfunc call;    if ((call = func->ob_type->tp_call) != NULL) {                 PyObject *result = (*call)(func, arg, kw);     // 此时调用func->ob_type的tp_call方法        if (result == NULL && !PyErr_Occurred())            PyErr_SetString(                PyExc_SystemError,                "NULL result without error in PyObject_Call");        return result;    }    PyErr_Format(PyExc_TypeError, "'%.200s' object is not callable",             func->ob_type->tp_name);    return NULL;}

此时传入的import是进过了包装的PyCFunction该类型ob_type设置成了PyCFuntion_Type,此时调用的就是PyCFuntion_Type中的tp_call;

PyTypeObject PyCFunction_Type = {    PyObject_HEAD_INIT(&PyType_Type)    0,    "builtin_function_or_method",    sizeof(PyCFunctionObject),    0,    (destructor)meth_dealloc,       /* tp_dealloc */    ...    (hashfunc)meth_hash,            /* tp_hash */    PyCFunction_Call,           /* tp_call */    ...};

此时我们查看PyCFunction_Call对应的函数流程;

此时由于是导入import

{
"__import__", (PyCFunction)builtin___import__, METH_VARARGS | METH_KEYWORDS, import_doc},

此时传入参数为METH_VARARGS | METH_KEYWORDS

PyObject *PyCFunction_Call(PyObject *func, PyObject *arg, PyObject *kw){    PyCFunctionObject* f = (PyCFunctionObject*)func;    PyCFunction meth = PyCFunction_GET_FUNCTION(func);    PyObject *self = PyCFunction_GET_SELF(func);    Py_ssize_t size;    switch (PyCFunction_GET_FLAGS(func) & ~(METH_CLASS | METH_STATIC | METH_COEXIST)) {  //获取传入的参数    case METH_VARARGS:        if (kw == NULL || PyDict_Size(kw) == 0)            return (*meth)(self, arg);        break;    case METH_VARARGS | METH_KEYWORDS:    case METH_OLDARGS | METH_KEYWORDS:        return (*(PyCFunctionWithKeywords)meth)(self, arg, kw);    ...}

此时此时调用的就是PyCFunction中的meth函数,此时对应的import的函数,builtin___import__方法;

static PyObject *builtin___import__(PyObject *self, PyObject *args, PyObject *kwds){    static char *kwlist[] = {
"name", "globals", "locals", "fromlist", "level", 0}; char *name; PyObject *globals = NULL; PyObject *locals = NULL; PyObject *fromlist = NULL; int level = -1; if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|OOOi:__import__", kwlist, &name, &globals, &locals, &fromlist, &level)) return NULL; return PyImport_ImportModuleLevel(name, globals, locals, fromlist, level); // 将参数解析到globals,locals,然后调用该方法}PyObject *PyImport_ImportModuleLevel(char *name, PyObject *globals, PyObject *locals, PyObject *fromlist, int level){ PyObject *result; lock_import(); result = import_module_level(name, globals, locals, fromlist, level); // 调用该函数导入库 if (unlock_import() < 0) { Py_XDECREF(result); PyErr_SetString(PyExc_RuntimeError, "not holding the import lock"); return NULL; } return result;}static PyObject *import_module_level(char *name, PyObject *globals, PyObject *locals, PyObject *fromlist, int level){ char buf[MAXPATHLEN+1]; Py_ssize_t buflen = 0; PyObject *parent, *head, *next, *tail; parent = get_parent(globals, buf, &buflen, level); // 获取当前的package环境 if (parent == NULL) return NULL; head = load_next(parent, Py_None, &name, buf, &buflen); // 加载对应的库 if (head == NULL) return NULL; tail = head; Py_INCREF(tail); while (name) { next = load_next(tail, tail, &name, buf, &buflen); Py_DECREF(tail); if (next == NULL) { Py_DECREF(head); return NULL; } tail = next; } ...}static PyObject *load_next(PyObject *mod, PyObject *altmod, char **p_name, char *buf, Py_ssize_t *p_buflen){ ... result = import_submodule(mod, p, buf); ...}

此时调用了import_submodule函数,

static PyObject *import_submodule(PyObject *mod, char *subname, char *fullname){        ...        fdp = find_module(fullname, subname, path, buf, MAXPATHLEN+1,                  &fp, &loader);  // 查找对应的文件        Py_XDECREF(path);        if (fdp == NULL) {            if (!PyErr_ExceptionMatches(PyExc_ImportError))                return NULL;            PyErr_Clear();            Py_INCREF(Py_None);            return Py_None;        }        m = load_module(fullname, fp, buf, fdp->type, loader);   //加载查找到的库        ...}static PyObject *load_module(char *name, FILE *fp, char *buf, int type, PyObject *loader){    ...    case C_BUILTIN:    case PY_FROZEN:        if (buf != NULL && buf[0] != '\0')            name = buf;        if (type == C_BUILTIN)            err = init_builtin(name);        else            err = PyImport_ImportFrozenModule(name);        if (err < 0)            return NULL;        if (err == 0) {            PyErr_Format(PyExc_ImportError,                     "Purported %s module %.200s not found",                     type == C_BUILTIN ?                        "builtin" : "frozen",                     name);            return NULL;        }        modules = PyImport_GetModuleDict();        m = PyDict_GetItemString(modules, name);        if (m == NULL) {            PyErr_Format(                PyExc_ImportError,                "%s module %.200s not properly initialized",                type == C_BUILTIN ?                    "builtin" : "frozen",                name);            return NULL;        }        Py_INCREF(m);        break;        ...}

此时由于导的thread是c实现的库,所以会调用init_builtin(name),当初始化成功后,并将该modules设置到全局的modules中;此时继续查看init_builtin;

static intinit_builtin(char *name){    struct _inittab *p;    if (_PyImport_FindExtension(name, name) != NULL)        return 1;    for (p = PyImport_Inittab; p->name != NULL; p++) { // 查找包含初始化方法的数组        if (strcmp(name, p->name) == 0) {            if (p->initfunc == NULL) {                PyErr_Format(PyExc_ImportError,                    "Cannot re-init internal module %.200s",                    name);                return -1;            }            if (Py_VerboseFlag)                PySys_WriteStderr("import %s # builtin\n", name);            (*p->initfunc)();             // 调用查找到的对应的初始化方法            if (PyErr_Occurred())                return -1;            if (_PyImport_FixupExtension(name, name) == NULL)                return -1;            return 1;        }    }    return 0;}extern struct _inittab _PyImport_Inittab[];struct _inittab *PyImport_Inittab = _PyImport_Inittab;

此时_PyImport_Inittab是定义在PC/config.c文件中;

struct _inittab {    char *name;               // 名称    void (*initfunc)(void);   // 对应的初始化方法};struct _inittab _PyImport_Inittab[] = {    ...        {
"time", inittime},#ifdef WITH_THREAD {
"thread", initthread},#endif ...}

此时根据传入的thread可以找到并调用initthread方法;

static PyMethodDef thread_methods[] = {    {
"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread, METH_VARARGS, start_new_doc}, {
"start_new", (PyCFunction)thread_PyThread_start_new_thread, METH_VARARGS, start_new_doc}, {
"allocate_lock", (PyCFunction)thread_PyThread_allocate_lock, METH_NOARGS, allocate_doc}, {
"allocate", (PyCFunction)thread_PyThread_allocate_lock, METH_NOARGS, allocate_doc}, {
"exit_thread", (PyCFunction)thread_PyThread_exit_thread, METH_NOARGS, exit_doc}, {
"exit", (PyCFunction)thread_PyThread_exit_thread, METH_NOARGS, exit_doc}, {
"interrupt_main", (PyCFunction)thread_PyThread_interrupt_main, METH_NOARGS, interrupt_doc}, {
"get_ident", (PyCFunction)thread_get_ident, METH_NOARGS, get_ident_doc}, {
"stack_size", (PyCFunction)thread_stack_size, METH_VARARGS, stack_size_doc},#ifndef NO_EXIT_PROG {
"exit_prog", (PyCFunction)thread_PyThread_exit_prog, METH_VARARGS},#endif {
NULL, NULL} /* sentinel */};PyMODINIT_FUNCinitthread(void){ PyObject *m, *d; /* Initialize types: */ if (PyType_Ready(&localtype) < 0) // 初始化localtype类型 return; /* Create the module and add the functions */ m = Py_InitModule3("thread", thread_methods, thread_doc); // 初始化thread对应的方法 if (m == NULL) return; /* Add a symbolic constant */ d = PyModule_GetDict(m); ThreadError = PyErr_NewException("thread.error", NULL, NULL); PyDict_SetItemString(d, "error", ThreadError); Locktype.tp_doc = lock_doc; // 设置线程锁的说明文档 Py_INCREF(&Locktype); PyDict_SetItemString(d, "LockType", (PyObject *)&Locktype); // 设置线程模块的LockType为设置的LockType,设置线程锁类型 Py_INCREF(&localtype); if (PyModule_AddObject(m, "_local", (PyObject *)&localtype) < 0) // 增加生成的模块一个属性_local对应localtype,保存对应本线程的相关值 return; /* Initialize the C thread library */ PyThread_init_thread(); // 初始化线程}

此时,我们先查看Py_InitModule3;

#define Py_InitModule3(name, methods, doc) \    Py_InitModule4(name, methods, doc, (PyObject *)NULL, \               PYTHON_API_VERSION)PyObject *Py_InitModule4(const char *name, PyMethodDef *methods, const char *doc,           PyObject *passthrough, int module_api_version){    PyObject *m, *d, *v, *n;    PyMethodDef *ml;    if (!Py_IsInitialized())      // 检查是否初始化        Py_FatalError("Interpreter not initialized (version mismatch?)");    if (module_api_version != PYTHON_API_VERSION) {        char message[512];        PyOS_snprintf(message, sizeof(message),                   api_version_warning, name,                   PYTHON_API_VERSION, name,                   module_api_version);        if (PyErr_Warn(PyExc_RuntimeWarning, message))             return NULL;    }    /* Make sure name is fully qualified.       This is a bit of a hack: when the shared library is loaded,       the module name is "package.module", but the module calls       Py_InitModule*() with just "module" for the name.  The shared       library loader squirrels away the true name of the module in       _Py_PackageContext, and Py_InitModule*() will substitute this       (if the name actually matches).    */    if (_Py_PackageContext != NULL) {        char *p = strrchr(_Py_PackageContext, '.');        if (p != NULL && strcmp(name, p+1) == 0) {            name = _Py_PackageContext;            _Py_PackageContext = NULL;        }    }    if ((m = PyImport_AddModule(name)) == NULL)  // 如果添加为空则返回为空        return NULL;    d = PyModule_GetDict(m);          // 添加获取添加的m    if (methods != NULL) {        n = PyString_FromString(name);    // 转换为string        if (n == NULL)            return NULL;        for (ml = methods; ml->ml_name != NULL; ml++) {  // 遍历获取自定义的所有方法            if ((ml->ml_flags & METH_CLASS) ||                (ml->ml_flags & METH_STATIC)) {                PyErr_SetString(PyExc_ValueError,                        "module functions cannot set"                        " METH_CLASS or METH_STATIC");                Py_DECREF(n);                return NULL;            }            v = PyCFunction_NewEx(ml, passthrough, n);  // 生成一个新的PyCFunction包装一下            if (v == NULL) {                Py_DECREF(n);                return NULL;            }            if (PyDict_SetItemString(d, ml->ml_name, v) != 0) {  // 设置到生成mod的属性字典中,对应的方法名称与方法                Py_DECREF(v);                Py_DECREF(n);                return NULL;            }            Py_DECREF(v);        }        Py_DECREF(n);    }    if (doc != NULL) {        v = PyString_FromString(doc);        if (v == NULL || PyDict_SetItemString(d, "__doc__", v) != 0) {            Py_XDECREF(v);            return NULL;        }        Py_DECREF(v);    }    return m;}

此时执行完成后,c库的thread就已经新建了,然后返回initthread方法,会继续设置参数后,最后会PyThread_init_thread()初始化线程环境;

voidPyThread_init_thread(void){#ifdef Py_DEBUG    char *p = getenv("THREADDEBUG");    if (p) {        if (*p)            thread_debug = atoi(p);        else            thread_debug = 1;    }#endif /* Py_DEBUG */    if (initialized)        return;    initialized = 1;    dprintf(("PyThread_init_thread called\n"));    PyThread__init_thread();}

如果线程环境已经初始化完成,则不再初始化,如果第一次初始化,则调用

PyThread__init_thread初始化;由于Python的设计初衷是跨平台的,所以会实现各个平台对应的不同的线程库,此时以pthread为例,此时PyThread__init_thread调用的就是pthread中的该方法;

static voidPyThread__init_thread(void){    /* DO AN INIT BY STARTING THE THREAD */    static int dummy = 0;    pthread_t thread1;    pthread_create(&thread1, NULL, (void *) _noop, &dummy);    // 创建了一个执行_noop的线程    pthread_join(thread1, NULL);                               // 直到执行完成,   检验是否可以创建线程}

此时内建thread库已经加载完成,剩下的就是调用该库的相应接口进行创建线程的操作。

当此时进行到调用线程thread.start_new_thread()时,此时字节码如下;

9          33 LOAD_NAME                0 (thread)             36 LOAD_ATTR                3 (start_new_thread)             39 LOAD_NAME                2 (f)             42 LOAD_CONST               5 (())             45 CALL_FUNCTION            2             48 POP_TOP

根据上文分析可知此时调用的就是thread中的start_new_thread时;

{
"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread, METH_VARARGS, start_new_doc},

继续查看thread_PyThread_start_new_thread;

static PyObject *thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs){    PyObject *func, *args, *keyw = NULL;    struct bootstate *boot;    long ident;    if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3,                       &func, &args, &keyw))     // 解压参数        return NULL;    if (!PyCallable_Check(func)) {               // 检查该func是否可调用        PyErr_SetString(PyExc_TypeError,                "first arg must be callable");        return NULL;    }    if (!PyTuple_Check(args)) {                 // 检出输入参数是否是元组        PyErr_SetString(PyExc_TypeError,                "2nd arg must be a tuple");        return NULL;    }    if (keyw != NULL && !PyDict_Check(keyw)) {        PyErr_SetString(PyExc_TypeError,                "optional 3rd arg must be a dictionary");        return NULL;    }    boot = PyMem_NEW(struct bootstate, 1);      // 申请内存    if (boot == NULL)        return PyErr_NoMemory();    boot->interp = PyThreadState_GET()->interp;     // 获取当前线程的解释器    boot->func = func;                              // 设置线程执行的函数    boot->args = args;                              // 线程执行传入的参数    boot->keyw = keyw;    Py_INCREF(func);    Py_INCREF(args);    Py_XINCREF(keyw);    PyEval_InitThreads(); /* Start the interpreter's thread-awareness */  // 初始化线程    ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);         // 生成一个线程    if (ident == -1) {        PyErr_SetString(ThreadError, "can't start new thread");        Py_DECREF(func);        Py_DECREF(args);        Py_XDECREF(keyw);        PyMem_DEL(boot);        return NULL;    }    return PyInt_FromLong(ident);}

此时的调用的PyThread_start_new_thread就是thread_pthread.h中的;

longPyThread_start_new_thread(void (*func)(void *), void *arg){    ...    status = pthread_create(&th, #if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)                 &attrs,#else                 (pthread_attr_t*)NULL,#endif                 (void* (*)(void *))func,                 (void *)arg                 );    ...}

调用pthread库函数pthread_create创建线程,此时就是执行的传入的函数和方法,分别为t_bootstrap和boot;

static voidt_bootstrap(void *boot_raw){    struct bootstate *boot = (struct bootstate *) boot_raw;    PyThreadState *tstate;                          // 线程对象    PyObject *res;    tstate = PyThreadState_New(boot->interp);      // 生成一个新的保存线程数据的结构    PyEval_AcquireThread(tstate);                   // 获取GIL    res = PyEval_CallObjectWithKeywords(        boot->func, boot->args, boot->keyw);       // 执行字节码,就相当于从新进行解释器执行对应的字节码    if (res == NULL) {        if (PyErr_ExceptionMatches(PyExc_SystemExit))            PyErr_Clear();        else {            PyObject *file;            PySys_WriteStderr(                "Unhandled exception in thread started by ");            file = PySys_GetObject("stderr");            if (file)                PyFile_WriteObject(boot->func, file, 0);            else                PyObject_Print(boot->func, stderr, 0);            PySys_WriteStderr("\n");            PyErr_PrintEx(0);        }    }    else        Py_DECREF(res);    Py_DECREF(boot->func);    Py_DECREF(boot->args);    Py_XDECREF(boot->keyw);    PyMem_DEL(boot_raw);    PyThreadState_Clear(tstate);                // 清除线程对象的数据    PyThreadState_DeleteCurrent();              // 删除当前线程,并将当前线程从线程列表中移除    PyThread_exit_thread();                     // 销毁线程}

此时首先先生成当前线程的结构,然后执行相应的函数,最后销毁相应的线程数据;首先会调用PyThreadState_New;

PyThreadState *PyThreadState_New(PyInterpreterState *interp){    PyThreadState *tstate = (PyThreadState *)malloc(sizeof(PyThreadState));    if (_PyThreadState_GetFrame == NULL)        _PyThreadState_GetFrame = threadstate_getframe;             // 获取当前执行的栈帧    if (tstate != NULL) {        tstate->interp = interp;                                    // 设置当前的解释器结构        tstate->frame = NULL;        tstate->recursion_depth = 0;        tstate->tracing = 0;        tstate->use_tracing = 0;        tstate->tick_counter = 0;        tstate->gilstate_counter = 0;        tstate->async_exc = NULL;#ifdef WITH_THREAD        tstate->thread_id = PyThread_get_thread_ident();            // 获取线程的id#else        tstate->thread_id = 0;#endif        tstate->dict = NULL;        tstate->curexc_type = NULL;        tstate->curexc_value = NULL;        tstate->curexc_traceback = NULL;        tstate->exc_type = NULL;        tstate->exc_value = NULL;        tstate->exc_traceback = NULL;        tstate->c_profilefunc = NULL;        tstate->c_tracefunc = NULL;        tstate->c_profileobj = NULL;        tstate->c_traceobj = NULL;#ifdef WITH_THREAD        _PyGILState_NoteThreadState(tstate);                       // 将当前线程对象加入到线程列表中,设置对应的key,#endif        HEAD_LOCK();                                               // 加锁,将当前线程设置到线程对应列表        tstate->next = interp->tstate_head;                        // 将当前执行的线程存入当前线程的下一个        interp->tstate_head = tstate;                              // 设置解释器当前执行的线程        HEAD_UNLOCK();                                             // 释放锁    }    return tstate;}

对应_PyGILState_NoteThreadState方法就是相应的key,该key的作用就是Python中保存的所有创建的线程的一个列表,在多线程中必须必须都被保存在其中;

static void_PyGILState_NoteThreadState(PyThreadState* tstate){    /* If autoTLSkey is 0, this must be the very first threadstate created       in Py_Initialize().  Don't do anything for now (we'll be back here       when _PyGILState_Init is called). */    if (!autoTLSkey)        return;    /* Stick the thread state for this thread in thread local storage.       The only situation where you can legitimately have more than one       thread state for an OS level thread is when there are multiple       interpreters, when:           a) You shouldn't really be using the PyGILState_ APIs anyway,              and:           b) The slightly odd way PyThread_set_key_value works (see              comments by its implementation) means that the first thread              state created for that given OS level thread will "win",              which seems reasonable behaviour.    */    if (PyThread_set_key_value(autoTLSkey, (void *)tstate) < 0)        Py_FatalError("Couldn't create autoTLSkey mapping");    /* PyGILState_Release must not try to delete this thread state. */    tstate->gilstate_counter = 1;}

其中PyThread_set_key_value方法就是将当前线程设置到线程列表中去,

intPyThread_set_key_value(int key, void *value){    struct key *p;    assert(value != NULL);    p = find_key(key, value);       // 查找对应的value,如果找到则返回,没找到则创建;    if (p == NULL)        return -1;    else        return 0;}static struct key *find_key(int key, void *value){    struct key *p;    long id = PyThread_get_thread_ident();              // 获取当前线程的id    if (!keymutex)        return NULL;    PyThread_acquire_lock(keymutex, 1);                 // 专门的在加入对象列表中加锁    for (p = keyhead; p != NULL; p = p->next) {         // 遍历线程列表,找到线程id与要找的线程id进行比较        if (p->id == id && p->key == key)            goto Done;                                  // 如果找到则释放锁并返回当前的key    }    if (value == NULL) {        assert(p == NULL);        goto Done;    }    p = (struct key *)malloc(sizeof(struct key));       // 如果在已有线程列表中没有找到,就新建一个key并加入当前线程列表中    if (p != NULL) {        p->id = id;        p->key = key;        p->value = value;        p->next = keyhead;        keyhead = p;    } Done:    PyThread_release_lock(keymutex);                    // 释放锁    return p;}

单独在切换线程状态是加锁,以保证切换的过程中的原子操作;此时新建线程完成后再继续执行PyEval_AcquireThread(tstate);

int PyThread_acquire_lock(PyThread_type_lock lock, int waitflag){    int success;    sem_t *thelock = (sem_t *)lock;    int status, error = 0;    dprintf(("PyThread_acquire_lock(%p, %d) called\n", lock, waitflag));    do {        if (waitflag)            status = fix_status(sem_wait(thelock));                         // 循环获取gil锁,        else            status = fix_status(sem_trywait(thelock));                          } while (status == EINTR); /* Retry if interrupted by a signal */    if (waitflag) {        CHECK_STATUS("sem_wait");    } else if (status != EAGAIN) {        CHECK_STATUS("sem_trywait");    }    success = (status == 0) ? 1 : 0;    dprintf(("PyThread_acquire_lock(%p, %d) -> %d\n", lock, waitflag, success));    return success;                         // 获取成功则返回}

此时就等待着线程获得执行;

首先需要说明的是,当该子线程还没开始执行字节码时,主线程就一直在运行,然后子线程执行到这里后就一直等待主线程释放gil,此时主线程是在何时才会释放gil锁呢,答案就在字节码解释器中;

PyObject *PyEval_EvalFrameEx(PyFrameObject *f, int throwflag){    for (;;) {    ...        if (--_Py_Ticker < 0) {                        if (*next_instr == SETUP_FINALLY) {                                /* Make the last opcode before                                   a try: finally: block uninterruptable. */                                goto fast_next_opcode;                        }            _Py_Ticker = _Py_CheckInterval;   // 当_Py_Ticker小于0的时候,重新设置_Py_Ticker            tstate->tick_counter++;#ifdef WITH_TSC            ticked = 1;#endif            if (things_to_do) {                if (Py_MakePendingCalls() < 0) {                    why = WHY_EXCEPTION;                    goto on_error;                }                if (things_to_do)                    /* MakePendingCalls() didn't succeed.                       Force early re-execution of this                       "periodic" code, possibly after                       a thread switch */                    _Py_Ticker = 0;            }#ifdef WITH_THREAD            if (interpreter_lock) {         // 如果gil不为空                /* Give another thread a chance */                if (PyThreadState_Swap(NULL) != tstate)         // 将当前执行现场设置为NULL,并将正在进行的线程与当前对比,如果不和当前线程相同则报错                    Py_FatalError("ceval: tstate mix-up");                PyThread_release_lock(interpreter_lock);        // 释放gil锁,操作系统调度其他线程执行                /* Other threads may run now */                PyThread_acquire_lock(interpreter_lock, 1);     // 当前线程获取gil锁,等待下一次条用                if (PyThreadState_Swap(tstate) != NULL)                    Py_FatalError("ceval: orphan tstate");                /* Check for thread interrupts */                if (tstate->async_exc != NULL) {                    x = tstate->async_exc;                    tstate->async_exc = NULL;                    PyErr_SetNone(x);                    Py_DECREF(x);                    why = WHY_EXCEPTION;                    goto on_error;                }            }#endif        }    ...}

此时,主线程在执行过_Py_CheckInterval后,主线程就释放gil锁,此时就会进入操作系统线程调度,待子线程获取gil后,然后就会执行PyEval_CallObjectWithKeywords,此时子线程也会进入字节码解释器,然后子线程字节码执行完成_Py_CheckInterval后,然后释放锁,然后操作系统开始调度其他线程继续执行,此时主线程与子线程都是同样的考操作系统进行调用。

至此,Python中的多线程机制大致流程分析完成,当开启多线程时,先是主线程调用创建子线程,此时子线程会等待主线程释放gil,当主线程释放gil后,如果此时主线程获得了gil此时子线程进入字节码解释器中,然后执行相应条数的字节码后,就释放锁,供其他线程使用。

转载地址:https://blog.csdn.net/qq_33339479/article/details/79900786 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:操作系统学习:基础轮廓梳理
下一篇:Python源码学习:Python类机制分析-用户自定义类

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年04月22日 23时41分29秒