代码覆盖率原理分析:sys.settrace流程分析
发布日期:2021-07-25 13:04:54 浏览次数:20 分类:技术文章

本文共 12392 字,大约阅读时间需要 41 分钟。

sys.settrace分析环境

本文环境python3.5.2

sys.settrace函数执行

首先我们继续查看示例代码如下:

import sysdef trace(frame, event, arg_unused):    print(frame.f_lineno, event, arg_unused)    return tracesys.settrace(trace)with open('test_file.py', "rb") as f:    source = f.read()code = compile(source, 'test_file.py', 'exec')exec(code)

继续使用上文分析过的脚本test_file.py文件,test_file.py文件如下:

import sysimport osdef test_a():    a = 1def test_b():    b = 2def run():    if 1:        r = 1    else:        r = 2    test_a()    ret = rrun()

此时运行脚本输出的内容如下:

1 call None1 line None3 line None6 line None10 line None14 line None23 line None14 call None16 line None19 line None6 call None7 line None7 return None20 line None20 return None23 return None

此时,我们继续修改示例代码,将trace函数中注释掉return trace这一行,此时继续运行,输出结果如下:

1 call None14 call None6 call None

此时的输出结果如上所述,为什么此时两次运行的结果会出现差异呢,本文我们就分析sys.settrace的整个执行流程。

sys.settrace函数的源码流程

首先,在Python3.5.2的源码中找到sysmodule.c文件,该文件中就是c实现的sys模块的相关函数;

static PyMethodDef sys_methods[] = {    ...    {"settrace",        sys_settrace, METH_O, settrace_doc},    {"gettrace",        sys_gettrace, METH_NOARGS, gettrace_doc},    ...    {NULL,              NULL}           /* sentinel */};

此时调用的sys.settrace函数就是调用了sys_settrace函数;

static PyObject *sys_settrace(PyObject *self, PyObject *args){    if (trace_init() == -1)                          // 初始化trace        return NULL;    if (args == Py_None)        PyEval_SetTrace(NULL, NULL);                 // 如果初始化传入的函数为None则设置空    else        PyEval_SetTrace(trace_trampoline, args);     // 否则设置该函数    Py_INCREF(Py_None);    return Py_None;                                  // 返回None}static inttrace_init(void){    static char *whatnames[7] = {"call", "exception", "line", "return",                                    "c_call", "c_exception", "c_return"};       // 定义了7个事件对应的描述字符    PyObject *name;    int i;    for (i = 0; i < 7; ++i) {        if (whatstrings[i] == NULL) {            name = PyUnicode_InternFromString(whatnames[i]);                    // 遍历转换为python字符串            if (name == NULL)                return -1;            whatstrings[i] = name;                                              // 设置对应的Python字符串        }    }    return 0;}#define PyTrace_CALL 0#define PyTrace_EXCEPTION 1#define PyTrace_LINE 2#define PyTrace_RETURN 3#define PyTrace_C_CALL 4#define PyTrace_C_EXCEPTION 5#define PyTrace_C_RETURN 6

此时,主要就是初始化对应的字符串,初始化完成的字符会作为参数返回作为参数传入回调函数,此时继续查看;

PyEval_SetTrace(trace_trampoline, args)

该行代码执行如下;

voidPyEval_SetTrace(Py_tracefunc func, PyObject *arg){    PyThreadState *tstate = PyThreadState_GET();                                // 获取当前的线程状态    PyObject *temp = tstate->c_traceobj;                                        // 获取当前线程的c_traceobj    _Py_TracingPossible += (func != NULL) - (tstate->c_tracefunc != NULL);          Py_XINCREF(arg);    tstate->c_tracefunc = NULL;                                                 // 设置当前线程的c_tracefunc为空    tstate->c_traceobj = NULL;    /* Must make sure that profiling is not ignored if 'temp' is freed */    tstate->use_tracing = tstate->c_profilefunc != NULL;    Py_XDECREF(temp);    tstate->c_tracefunc = func;                                                 // 保存该函数    tstate->c_traceobj = arg;                                                   // 保存该回调函数    /* Flag that tracing or profiling is turned on */    tstate->use_tracing = ((func != NULL)                           || (tstate->c_profilefunc != NULL));                 // 是否开启记录}static inttrace_trampoline(PyObject *self, PyFrameObject *frame,                 int what, PyObject *arg){    PyObject *callback;    PyObject *result;    if (what == PyTrace_CALL)                                         // 如果是call 行为        callback = self;                                              // callback 设置为self    else        callback = frame->f_trace;                                    // 设置为frame对应的f_trace    if (callback == NULL)                                             // 如果callback为0则返回        return 0;    result = call_trampoline(callback, frame, what, arg);             // 调用回调函数并处理    if (result == NULL) {                                             // 如果处理结果为空则设置为空        PyEval_SetTrace(NULL, NULL);        Py_CLEAR(frame->f_trace);                                     // 清空f_trace函数        return -1;    }    if (result != Py_None) {                                          // 如果结果不为空        PyObject *temp = frame->f_trace;                              // 获取frame的 f_trace        frame->f_trace = NULL;                                        // 设置为空        Py_XDECREF(temp);           frame->f_trace = result;                                      // 重新设置处理回调函数    }    else {        Py_DECREF(result);    }    return 0;}

此时核心的工作函数就是call_trampoline函数,该函数等待下文执行的时候分析,此时settrace的初始化过程都已经完成,初始化完成之后,此时就开始了python整个脚本的执行。此时就会进入ceval.c文件中的PyEval_EvalFrameEx函数,该函数开头执行的过程中会执行如下代码;

if (tstate->use_tracing) {                                                   // 检查是否进行跟踪    if (tstate->c_tracefunc != NULL) {                                       // 设置进行跟踪的函数不为空        /* tstate->c_tracefunc, if defined, is a           function that will be called on *every* entry           to a code block.  Its return value, if not           None, is a function that will be called at           the start of each executed line of code.           (Actually, the function must return itself           in order to continue tracing.)  The trace           functions are called with three arguments:           a pointer to the current frame, a string           indicating why the function is called, and           an argument which depends on the situation.           The global trace function is also called           whenever an exception is detected. */        if (call_trace_protected(tstate->c_tracefunc,                                 tstate->c_traceobj,                                 tstate, f, PyTrace_CALL, Py_None)) {        // 调用该函数,该函数就是设置的trace_trampoline函数            /* Trace function raised an error */            goto exit_eval_frame;        }    }    if (tstate->c_profilefunc != NULL) {                                     // 是否设置了profile函数        /* Similar for c_profilefunc, except it needn't           return itself and isn't called for "line" events */        if (call_trace_protected(tstate->c_profilefunc,                                 tstate->c_profileobj,                                 tstate, f, PyTrace_CALL, Py_None)) {            /* Profile function raised an error */            goto exit_eval_frame;        }    }}

此时我们继续查看call_trace_protected函数;

static intcall_trace_protected(Py_tracefunc func, PyObject *obj,                     PyThreadState *tstate, PyFrameObject *frame,                     int what, PyObject *arg){    PyObject *type, *value, *traceback;    int err;    PyErr_Fetch(&type, &value, &traceback);    err = call_trace(func, obj, tstate, frame, what, arg);            // 调用注册的函数    if (err == 0)                                                     // 判断是否执行成功    {        PyErr_Restore(type, value, traceback);                               return 0;    }    else {        Py_XDECREF(type);        Py_XDECREF(value);        Py_XDECREF(traceback);        return -1;                                                    // 失败返回-1    }}static intcall_trace(Py_tracefunc func, PyObject *obj,           PyThreadState *tstate, PyFrameObject *frame,           int what, PyObject *arg){    int result;    if (tstate->tracing)                                             // 判断是否可以执行        return 0;    tstate->tracing++;                                               // 加1    tstate->use_tracing = 0;                                            result = func(obj, frame, what, arg);                            // 调用该函数,其实就是调用了trace_trampoline函数    tstate->use_tracing = ((tstate->c_tracefunc != NULL)                           || (tstate->c_profilefunc != NULL));      // 判断是否继续trace    tstate->tracing--;    return result;}

此时在调用call_trace时,就最终进入了trace_trampoline函数的执行;

static PyObject *call_trampoline(PyObject* callback,                PyFrameObject *frame, int what, PyObject *arg){    PyObject *args;    PyObject *whatstr;    PyObject *result;    args = PyTuple_New(3);                                      // 生成位置参数元组    if (args == NULL)                                           // 如果为空则返回        return NULL;    if (PyFrame_FastToLocalsWithError(frame) < 0)        return NULL;    Py_INCREF(frame);    whatstr = whatstrings[what];                                // 获取对应的操作名称    Py_INCREF(whatstr);    if (arg == NULL)        arg = Py_None;    Py_INCREF(arg);    PyTuple_SET_ITEM(args, 0, (PyObject *)frame);               // 设置参数1 为 frame    PyTuple_SET_ITEM(args, 1, whatstr);                         // 设置参数2 为 对应的操作 如call line等    PyTuple_SET_ITEM(args, 2, arg);                             // 设置传入参数    /* call the Python-level function */    result = PyEval_CallObject(callback, args);                 // 调用对应的回调函数的方法并获取返回结果    PyFrame_LocalsToFast(frame, 1);    if (result == NULL)        PyTraceBack_Here(frame);    /* cleanup */    Py_DECREF(args);    return result;                                              // 返回返回结果}

此时执行完成就很清晰了,如果设置了trace函数,执行完成该trace函数之后的返回值会被设置到下一个frame的f_trace处,基于Python的frame的执行结构,只要有新的code块执行,都会执行该段代码,所以在脚本注释掉return trace这一行之后,任然会打印出call的相关信息,当我们执行了call之后,在下一次执行到;

if (what == PyTrace_CALL)    callback = self;else    callback = frame->f_trace;if (callback == NULL)    return 0;

此时的callback就是上一次执行返回的函数,所以就会在有 return trace该行代码的时候,会继续调用该trace函数去执行。

当代码执行到每一行的时候就会执行到如下代码:

fast_next_opcode:                                                           // 字节码解释的时候执行下一个指令    f->f_lasti = INSTR_OFFSET();                                            // 获取执行的行数    /* line-by-line tracing support */    if (_Py_TracingPossible &&        tstate->c_tracefunc != NULL && !tstate->tracing) {                  // 检查是否有跟踪的函数        int err;        /* see maybe_call_line_trace           for expository comments */        f->f_stacktop = stack_pointer;        err = maybe_call_line_trace(tstate->c_tracefunc,                                    tstate->c_traceobj,                                    tstate, f,                                    &instr_lb, &instr_ub, &instr_prev);    // 可能是line的检查        /* Reload possibly changed frame fields */        JUMPTO(f->f_lasti);        if (f->f_stacktop != NULL) {            stack_pointer = f->f_stacktop;            f->f_stacktop = NULL;        }        if (err)            /* trace function raised an exception */            goto error;    }    /* Extract opcode and argument */    opcode = NEXTOP();                                                      // 获取下一条指令    /* See Objects/lnotab_notes.txt for a description of how tracing works. */    static int    maybe_call_line_trace(Py_tracefunc func, PyObject *obj,                          PyThreadState *tstate, PyFrameObject *frame,                          int *instr_lb, int *instr_ub, int *instr_prev)    {        int result = 0;        int line = frame->f_lineno;                                                  // 获取行数        /* If the last instruction executed isn't in the current           instruction window, reset the window.        */        if (frame->f_lasti < *instr_lb || frame->f_lasti >= *instr_ub) {            PyAddrPair bounds;            line = _PyCode_CheckLineNumber(frame->f_code, frame->f_lasti,                                           &bounds);            *instr_lb = bounds.ap_lower;            *instr_ub = bounds.ap_upper;        }        /* If the last instruction falls at the start of a line or if           it represents a jump backwards, update the frame's line           number and call the trace function. */        if (frame->f_lasti == *instr_lb || frame->f_lasti < *instr_prev) {            frame->f_lineno = line;                                                 // 重新更新行数            result = call_trace(func, obj, tstate, frame, PyTrace_LINE, Py_None);   // 调用记录的行数的trace函数        }        *instr_prev = frame->f_lasti;                                               // 保存当前的行数        return result;    }

至此,调用到的line 和 call记录都已经分析完成,该流程就是基本的Python代码监控的基本执行流程。

总结

本文主要通过分析sys.settrace函数的执行流程,深入Python源码分析了是如何调用定义的trace函数,本文主要就是分析了call事件和line事件的执行流程,主要都是在Python的字节码的执行过程中去解析记录并调用注册的回调函数,也主要是通过Python的frame概念,将对应的执行函数都绑定在frame的属性上,以此达到只要注册了settrace函数,在没有返回的时候还是会触发’call‘事件,其他还有EXCEPTION等事件的触发,大家有兴趣可自行查看相关代码。鉴于本人才疏学浅,如有疏漏请批评指正。

转载地址:https://blog.csdn.net/qq_33339479/article/details/88535305 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Nginx源码分析:启动流程
下一篇:coverage代码覆盖率测试工具:基本原理分析与使用

发表评论

最新留言

不错!
[***.144.177.141]2024年04月17日 16时44分29秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章