Django源码分析8:单元测试test命令浅析
发布日期:2021-07-25 13:04:46 浏览次数:13 分类:技术文章

本文共 45373 字,大约阅读时间需要 151 分钟。

django源码分析

本文环境python3.5.2,django1.10.x系列

django源码分析-test命令分析

Django项目中提供了,test命令行命令来执行django的单元测试,该测试用例就是测试自己编写的api测试,用来测试自己编写的代码是否正确,当我们修改旧的代码时,通过运行测试来确保不会影响到旧的逻辑,单元测试是Django项目中很重要的一部分。本文按照mysql案例进行说明,相关的数据库配置如下:

DATABASES = {    'default': {        'ENGINE': 'django.db.backends.mysql',        'NAME': 'project',        'USER': 'test',        'PASSWORD': 'test',        'HOST': '127.0.0.1',        'PORT': '3306',        "ATOMIC_REQUESTS": True,        "TEST": {            "NAME": "test_db",            "CHARSET": "utf8",            "COLLATION": "utf8_general_ci"    }}

本文的test用例代码,位于项目的tests.py文件中,示例代码如下;

class ApplicationListAPITest(TestCase):    def setUp(self):        self.user = self.create_user()    def create_user(self, email=None, **kwargs):        name = kwargs.get("name") if kwargs.get("name", None) else email        password = md5hex(kwargs.get('password', 'password'))        is_authorize = kwargs.get('is_authorize', False)        kwargs.setdefault('account', email)        kwargs.setdefault('name', name)        kwargs.setdefault('type', user_type)        kwargs.setdefault('is_staff', False)        kwargs.setdefault('mobile', '12345678901')        user = User(**kwargs)        user.set_password(password)        user.save()        return user    def login_as(self, user, password='password'):        return self.client.login(username=user.name, password=password)    def test_list(self):        self.login_as(self.user)        url = '/test'        resp = self.client.get(url)        ret_data = resp.data        self.assertEqual(resp.status_code, 200)

test的执行概述

在命令行中输入的测试,可能会有好几种情况出现,比如如下几种基本情况;

python manage.py test  # 执行所有项目下的test.py中的测试用例python manage.py test app_name.tests  # 执行app_name下的tests中的测试用例 效果等同于app_namepython manage.py test app_name.tests. ApplicationListAPITest    # 执行app_name.tests文件中的ApplicationListAPITest测试用例python manage.py test app_name.tests. ApplicationListAPITest.test_list      # 执行app_name.tests文件中的ApplicationListAPITest的test_list方法

鉴于输入的情况较多和平常使用的具体环境,通过分析其中的任意一个的执行流程就可知其他几种情况的执行过程,故就举例使用python manage.py test 来描述执行test的过程,首先查看test中的Command的handle处理。

def handle(self, *test_labels, **options):    from django.conf import settings                                            # 导入配置文件    from django.test.utils import get_runner                                    # 获取test的执行类    TestRunner = get_runner(settings, options['testrunner'])                    # 导入testrunner    if options['liveserver'] is not None:                                       # 判断是否使用liveserver 是否正真启动一个django server        os.environ['DJANGO_LIVE_TEST_SERVER_ADDRESS'] = options['liveserver']    del options['liveserver']    test_runner = TestRunner(**options)                                         # 初始化TestRunner    failures = test_runner.run_tests(test_labels)                               # 运行测试用例    if failures:        sys.exit(bool(failures))                                                # 如果有失败返回失败信息

首先查看get_runner函数,该函数主要就是导入测试用的testrunner类,

def get_runner(settings, test_runner_class=None):    if not test_runner_class:                                           # 如果没有传入test_runner_class        test_runner_class = settings.TEST_RUNNER                        # 使用配置文件中的TEST_RUNNER,在都不配置的情况下默认使用django.test.runner.DiscoverRunner    test_path = test_runner_class.split('.')                        # Allow for Python 2.5 relative paths    if len(test_path) > 1:        test_module_name = '.'.join(test_path[:-1])    else:        test_module_name = '.'    test_module = __import__(test_module_name, {}, {}, force_str(test_path[-1]))        # 导入该runner类的module    test_runner = getattr(test_module, test_path[-1])                                   # 获取该类    return test_runner                                                                  # 返回该类

由于在都不配置的情况下,使用django.test.runner.DiscoverRunner类,本文就按照该类来分析,在初始化该实例后,就调用了run_tests方法来执行所有的测试,

def run_tests(self, test_labels, extra_tests=None, **kwargs):    """    Run the unit tests for all the test labels in the provided list.    Test labels should be dotted Python paths to test modules, test    classes, or test methods.    A list of 'extra' tests may also be provided; these tests    will be added to the test suite.    Returns the number of tests that failed.    """    self.setup_test_environment()                           # 建立测试环境    suite = self.build_suite(test_labels, extra_tests)      # 建立测试的suite,这是将所有的测试进行包装,然后包装成一个suite    old_config = self.setup_databases()                     # 建立测试数据库    result = self.run_suite(suite)                          # 执行所有找到的测试用例    self.teardown_databases(old_config)                     # 摧毁数据库    self.teardown_test_environment()                        # 摧毁测试环境    return self.suite_result(suite, result)                 # 包装返回测试结果

首先来查看self.setup_test_environment建立测试环境的过程;

def setup_test_environment(self, **kwargs):    setup_test_environment()                # 建立测试环境    settings.DEBUG = False                  # 设置运行环境debug为False    unittest.installHandler()               # 注册handler...def setup_test_environment():    """    Perform global pre-test setup, such as installing the instrumented template    renderer and setting the email backend to the locmem email backend.    """    Template._original_render = Template._render                                # 配置render    Template._render = instrumented_test_render                 # Storing previous values in the settings module itself is problematic.    # Store them in arbitrary (but related) modules instead. See #20636.    mail._original_email_backend = settings.EMAIL_BACKEND                       # 配置邮件    settings.EMAIL_BACKEND = 'django.core.mail.backends.locmem.EmailBackend'    request._original_allowed_hosts = settings.ALLOWED_HOSTS                    # 配置hosts    settings.ALLOWED_HOSTS = ['*']                                              # 测试环境下允许所有host连接    mail.outbox = []    deactivate()...def installHandler():global _interrupt_handlerif _interrupt_handler is None:    default_handler = signal.getsignal(signal.SIGINT)                           # 获取SIGINT    _interrupt_handler = _InterruptHandler(default_handler)                     # 包装default_handler    signal.signal(signal.SIGINT, _interrupt_handler)                            # 注册信号与对应的处理函数

主要就是重新配置了运行的模式,添加了所有的hosts访问,注册了相关handler的处理函数。

当处理完成初始环境后,就来执行比较重要的build_suite的过程了,

self.build_suite(test_labels, extra_tests)

该函数的执行过程如下;

def build_suite(self, test_labels=None, extra_tests=None, **kwargs):    suite = self.test_suite()                                                   # unittest.TestSuite的初始化    test_labels = test_labels or ['.']                                          # 由于没有传入参数,test_labels为['.']    extra_tests = extra_tests or []                                             # extra_tests默认为空           discover_kwargs = {}                                                        # 保存查找的匹配模式和查找目录等信息    if self.pattern is not None:                                                # 如果不为空        discover_kwargs['pattern'] = self.pattern                               # 使用该pattern 默认的为test*.py    if self.top_level is not None:        discover_kwargs['top_level_dir'] = self.top_level                       # 查找的目录    for label in test_labels:                                                   # 遍历        kwargs = discover_kwargs.copy()                                         # 拷贝一份        tests = None        label_as_path = os.path.abspath(label)                                  # 获取label所在的目录的绝对地址        # if a module, or "module.ClassName[.method_name]", just run those        if not os.path.exists(label_as_path):                                   # 如果不存在则代表是使用了类名或者方法作为参数            tests = self.test_loader.loadTestsFromName(label)                   # 加载查找suite        elif os.path.isdir(label_as_path) and not self.top_level:               # 如果是目录 并且top_level为空            # Try to be a bit smarter than unittest about finding the            # default top-level for a given directory path, to avoid            # breaking relative imports. (Unittest's default is to set            # top-level equal to the path, which means relative imports            # will result in "Attempted relative import in non-package.").            # We'd be happy to skip this and require dotted module paths            # (which don't cause this problem) instead of file paths (which            # do), but in the case of a directory in the cwd, which would            # be equally valid if considered as a top-level module or as a            # directory path, unittest unfortunately prefers the latter.            top_level = label_as_path                                           # 获取当前的目录            while True:                init_py = os.path.join(top_level, '__init__.py')                # 加入__init__.py                if os.path.exists(init_py):                                     # 检查包的__init__.py是否存在                    try_next = os.path.dirname(top_level)                       # 存在则获取目录下的名称                    if try_next == top_level:                                   # 如果和上一级目录相同则停止                        # __init__.py all the way down? give up.                        break                    top_level = try_next                                        # 否则继续深入查找                    continue                break            kwargs['top_level_dir'] = top_level                                 # 赋值top_level_dir         if not (tests and tests.countTestCases()) and is_discoverable(label):   # 如果没有测试tests 并且是可以查找的目录            # Try discovery if path is a package or directory            tests = self.test_loader.discover(start_dir=label, **kwargs)        # 查找目录里面的tests            # Make unittest forget the top-level dir it calculated from this            # run, to support running tests from two different top-levels.            self.test_loader._top_level_dir = None                              # 置空        suite.addTests(tests)                                                   # 将找到的tests添加到suite中    for test in extra_tests:        suite.addTest(test)    if self.tags or self.exclude_tags:                                          # 是否有tag        suite = filter_tests_by_tags(suite, self.tags, self.exclude_tags)       # 找到有tag的tests    suite = reorder_suite(suite, self.reorder_by, self.reverse)                 # 按照reorder_by排序test    if self.parallel > 1:        parallel_suite = self.parallel_test_suite(suite, self.parallel, self.failfast)        # Since tests are distributed across processes on a per-TestCase        # basis, there's no need for more processes than TestCases.        parallel_units = len(parallel_suite.subsuites)        if self.parallel > parallel_units:            self.parallel = parallel_units        # If there's only one TestCase, parallelization isn't needed.        if self.parallel > 1:            suite = parallel_suite    return suite                                                                # 返回数据

先是查找对应的匹配的表达式,然后根据输入参数查找对应的目录,由于此时没有带具体的类名的参数输入,所以会执行到如下代码,去检查目录下所有的文件;

# Try discovery if path is a package or directory                tests = self.test_loader.discover(start_dir=label, **kwargs)

其中的test_loader就是unittest.defaultTestLoader,调用该实例的discover方法,

def discover(self, start_dir, pattern='test*.py', top_level_dir=None):    """Find and return all test modules from the specified start    directory, recursing into subdirectories to find them and return all    tests found within them. Only test files that match the pattern will    be loaded. (Using shell style pattern matching.)    All test modules must be importable from the top level of the project.    If the start directory is not the top level directory then the top    level directory must be specified separately.    If a test package name (directory with '__init__.py') matches the    pattern then the package will be checked for a 'load_tests' function. If    this exists then it will be called with (loader, tests, pattern) unless    the package has already had load_tests called from the same discovery    invocation, in which case the package module object is not scanned for    tests - this ensures that when a package uses discover to further    discover child tests that infinite recursion does not happen.    If load_tests exists then discovery does *not* recurse into the package,    load_tests is responsible for loading all tests in the package.    The pattern is deliberately not stored as a loader attribute so that    packages can continue discovery themselves. top_level_dir is stored so    load_tests does not need to pass this argument in to loader.discover().    Paths are sorted before being imported to ensure reproducible execution    order even on filesystems with non-alphabetical ordering like ext3/4.    """    set_implicit_top = False    if top_level_dir is None and self._top_level_dir is not None:               #由于此时传入的top_level_dir不为空        # make top_level_dir optional if called from load_tests in a package        top_level_dir = self._top_level_dir    elif top_level_dir is None:        set_implicit_top = True        top_level_dir = start_dir    top_level_dir = os.path.abspath(top_level_dir)                              # 直接获取目录的绝对路径    if not top_level_dir in sys.path:                                           # 如果该目录不再系统路径中则加入        # all test modules must be importable from the top level directory        # should we *unconditionally* put the start directory in first        # in sys.path to minimise likelihood of conflicts between installed        # modules and development versions?        sys.path.insert(0, top_level_dir)    self._top_level_dir = top_level_dir                                         # 设置目录名    is_not_importable = False    is_namespace = False    tests = []    if os.path.isdir(os.path.abspath(start_dir)):                               # 检查开始的目录是否为文件夹        start_dir = os.path.abspath(start_dir)                                  # 获取开始的目录的绝对地址        if start_dir != top_level_dir:                                          # 如果不相同            is_not_importable = not os.path.isfile(os.path.join(start_dir, '__init__.py'))  # 判断是否拥有__init__.py文件,即判断是否是模块    else:        # support for discovery from dotted module names        try:            __import__(start_dir)                                               # 直接导入        except ImportError:            is_not_importable = True                                            # 设置不能导入的标志位        else:            the_module = sys.modules[start_dir]                                 # 获取该module            top_part = start_dir.split('.')[0]                                  # 获取顶级目录            try:                start_dir = os.path.abspath(                   os.path.dirname((the_module.__file__)))                      # 获取绝对目录            except AttributeError:                # look for namespace packages                try:                    spec = the_module.__spec__                except AttributeError:                    spec = None                if spec and spec.loader is None:                    if spec.submodule_search_locations is not None:                        is_namespace = True                        for path in the_module.__path__:                            if (not set_implicit_top and                                not path.startswith(top_level_dir)):                                continue                            self._top_level_dir = \                                (path.split(the_module.__name__                                     .replace(".", os.path.sep))[0])                            tests.extend(self._find_tests(path,                                                          pattern,                                                          namespace=True))                elif the_module.__name__ in sys.builtin_module_names:                    # builtin module                    raise TypeError('Can not use builtin modules '                                    'as dotted module names') from None                else:                    raise TypeError(                        'don\'t know how to discover from {!r}'                        .format(the_module)) from None            if set_implicit_top:                if not is_namespace:                    self._top_level_dir = \                       self._get_directory_containing_module(top_part)                    sys.path.remove(top_level_dir)                else:                    sys.path.remove(top_level_dir)    if is_not_importable:                                                           # 判断是否可导入  如果不能导入则报错        raise ImportError('Start directory is not importable: %r' % start_dir)    if not is_namespace:        tests = list(self._find_tests(start_dir, pattern))                          # 查找该目录下的所有tests    return self.suiteClass(tests)                                                   # 添加tests

先判断是否能够导入,导入后调用self._find_tests方法来查找tests,

def _find_tests(self, start_dir, pattern, namespace=False):    """Used by discovery. Yields test suites it loads."""    # Handle the __init__ in this package    name = self._get_name_from_path(start_dir)                                  # 获取开始的 name 此时为 '.'    # name is '.' when start_dir == top_level_dir (and top_level_dir is by    # definition not a package).    if name != '.' and name not in self._loading_packages:                      # 判断name不为 '.' 并且name不再已经导入的packages中        # name is in self._loading_packages while we have called into        # loadTestsFromModule with name.        tests, should_recurse = self._find_test_path(            start_dir, pattern, namespace)                                      # 根据dir pattern namespace查找tests        if tests is not None:                                                   # 如果tests不为空则返回            yield tests                                                                 if not should_recurse:                                                  # 如果为False则出了不能导入的错误            # Either an error occurred, or load_tests was used by the            # package.            return    # Handle the contents.    paths = sorted(os.listdir(start_dir))                                       # 获取当前的目录下的所有文件或文件夹    for path in paths:                                                          # 遍历所有子文件或目录        full_path = os.path.join(start_dir, path)                               # 获取全路径        tests, should_recurse = self._find_test_path(            full_path, pattern, namespace)                                      # 查找tests        if tests is not None:                                                   # 如果不为空则返回            yield tests        if should_recurse:                                                                  # we found a package that didn't use load_tests.            name = self._get_name_from_path(full_path)                          # 获取name            self._loading_packages.add(name)                                    # 添加到导入的packages集合中            try:                yield from self._find_tests(full_path, pattern, namespace)      # 继续递归遍历            finally:                self._loading_packages.discard(name)                            # 删除该name

主要是通过了递归调用来查找是否有tests内容,主要是通过self._find_test_path函数来查找tests,

def _find_test_path(self, full_path, pattern, namespace=False):    """Used by discovery.    Loads tests from a single file, or a directories' __init__.py when    passed the directory.    Returns a tuple (None_or_tests_from_file, should_recurse).    """    basename = os.path.basename(full_path)                                  # 获取文件夹名称    if os.path.isfile(full_path):                                           # 检查是否是文件        if not VALID_MODULE_NAME.match(basename):                           # 检查是否是合法的名称            # valid Python identifiers only            return None, False        if not self._match_path(basename, full_path, pattern):              # 检查是否配置pattern            return None, False                                              # 没有匹配上返回None False        # if the test file matches, load it        name = self._get_name_from_path(full_path)                          # 重新获取路径        try:            module = self._get_module_from_name(name)                       # 导入该module        except case.SkipTest as e:            return _make_skipped_test(name, e, self.suiteClass), False        except:            error_case, error_message = \                _make_failed_import_test(name, self.suiteClass)            self.errors.append(error_message)            return error_case, False        else:            mod_file = os.path.abspath(                getattr(module, '__file__', full_path))                                         realpath = _jython_aware_splitext(                os.path.realpath(mod_file))            fullpath_noext = _jython_aware_splitext(                os.path.realpath(full_path))            if realpath.lower() != fullpath_noext.lower():                module_dir = os.path.dirname(realpath)                mod_name = _jython_aware_splitext(                    os.path.basename(full_path))                expected_dir = os.path.dirname(full_path)                msg = ("%r module incorrectly imported from %r. Expected "                       "%r. Is this module globally installed?")                raise ImportError(                    msg % (mod_name, module_dir, expected_dir))            return self.loadTestsFromModule(module, pattern=pattern), False     # 调用从Module中查找tests    elif os.path.isdir(full_path):                                              # 如果是文件夹        if (not namespace and            not os.path.isfile(os.path.join(full_path, '__init__.py'))):            return None, False                                                  # 如果没有namespace 并没有__init__.py文件则返回空        load_tests = None        tests = None        name = self._get_name_from_path(full_path)                              # 重新设置路径名        try:            package = self._get_module_from_name(name)                          # 获取module        except case.SkipTest as e:            return _make_skipped_test(name, e, self.suiteClass), False        except:            error_case, error_message = \                _make_failed_import_test(name, self.suiteClass)            self.errors.append(error_message)            return error_case, False        else:            load_tests = getattr(package, 'load_tests', None)                   # 获取load_tests属性            # Mark this package as being in load_tests (possibly ;))                self._loading_packages.add(name)                                    # 添加到loading_package            try:                tests = self.loadTestsFromModule(package, pattern=pattern)      # 从module中导入tests                if load_tests is not None:                    # loadTestsFromModule(package) has loaded tests for us.                    return tests, False                return tests, True                                              # 返回找到的tests            finally:                self._loading_packages.discard(name)                            # 删除name    else:        return None, False

主要是判断了是否是文件或者是否是文件夹,然后导入对应的module然后调用self.loadTestsFromModule去查找对应的tests,

def loadTestsFromModule(self, module, *args, pattern=None, **kws):    """Return a suite of all tests cases contained in the given module"""    # This method used to take an undocumented and unofficial    # use_load_tests argument.  For backward compatibility, we still    # accept the argument (which can also be the first position) but we    # ignore it and issue a deprecation warning if it's present.    if len(args) > 0 or 'use_load_tests' in kws:                                # 判断传入参数或者是否有        warnings.warn('use_load_tests is deprecated and ignored',                      DeprecationWarning)        kws.pop('use_load_tests', None)    if len(args) > 1:        # Complain about the number of arguments, but don't forget the        # required `module` argument.        complaint = len(args) + 1        raise TypeError('loadTestsFromModule() takes 1 positional argument but {} were given'.format(complaint))    if len(kws) != 0:        # Since the keyword arguments are unsorted (see PEP 468), just        # pick the alphabetically sorted first argument to complain about,        # if multiple were given.  At least the error message will be        # predictable.        complaint = sorted(kws)[0]        raise TypeError("loadTestsFromModule() got an unexpected keyword argument '{}'".format(complaint))    tests = []    for name in dir(module):                                                    # 获取这个模块下所有的内容        obj = getattr(module, name)                                             # 判断obj是否是type实例,并且是否是TestCase的子类        if isinstance(obj, type) and issubclass(obj, case.TestCase):            tests.append(self.loadTestsFromTestCase(obj))                       # 如果是,则调用loadTestsFromTestCase去查找类方法    load_tests = getattr(module, 'load_tests', None)    tests = self.suiteClass(tests)                                              # TestSuite实例化成tests    if load_tests is not None:        try:            return load_tests(self, tests, pattern)             except Exception as e:            error_case, error_message = _make_failed_load_tests(                module.__name__, e, self.suiteClass)            self.errors.append(error_message)            return error_case    return tests                                                                # 返回

此时就直接调用了self.loadTestsFromTestCase去查找tests,

def loadTestsFromTestCase(self, testCaseClass):    """Return a suite of all tests cases contained in testCaseClass"""    if issubclass(testCaseClass, suite.TestSuite):                          # 判断是否是TestSuite的子类        raise TypeError("Test cases should not be derived from "                        "TestSuite. Maybe you meant to derive from "                        "TestCase?")    testCaseNames = self.getTestCaseNames(testCaseClass)                    # 获取所有的test_*的属性并返回成列表    if not testCaseNames and hasattr(testCaseClass, 'runTest'):             # 如果没有符合条件的列表,并且有runTest属性        testCaseNames = ['runTest']                                         # 重写该列表    loaded_suite = self.suiteClass(map(testCaseClass, testCaseNames))       # 实例化成TestSuite实例    return loaded_suite                                                     # 返回该实例

其中,此时传入的testCaseClass就是本文例子中的ApplicationListAPITest,此时通过self.getTestCaseNames方法获取符合条件的属性列表,

def getTestCaseNames(self, testCaseClass):    """Return a sorted sequence of method names found within testCaseClass    """    def isTestMethod(attrname, testCaseClass=testCaseClass,                     prefix=self.testMethodPrefix):                             # 此时的testMethodPrefix就是'test'        return attrname.startswith(prefix) and \            callable(getattr(testCaseClass, attrname))                          # 判断是否以'test'开头并且能够callable    testFnNames = list(filter(isTestMethod, dir(testCaseClass)))                # 获取该类所有的属性,并    if self.sortTestMethodsUsing:        testFnNames.sort(key=functools.cmp_to_key(self.sortTestMethodsUsing))   # 重写排序    return testFnNames                                                          # 返回属性名称列表

此时就self.suiteClass(map(testCaseClass, testCaseNames)),此时的testCaseClass就是继承自TestCase的ApplicationListAPITest,testCaseNames为[‘test_list’],此时就是实例化了一个继承自TestCase的ApplicationListAPITest类的实例,实例化的时候,传入的是’test_list’;

def __init__(self, methodName='runTest'):        """Create an instance of the class that will use the named test           method when executed. Raises a ValueError if the instance does           not have a method with the specified name.        """        self._testMethodName = methodName  # 传入待测试的方法名称        self._outcome = None        self._testMethodDoc = 'No test'

至此,tests查找完成,将找到的tests添加到TestSuite实例中。

创建测试数据库

完成了所有的tests添加到suite中,此时就会执行数据库创建过程;

old_config = self.setup_databases()

调用setup_databases函数;

def setup_databases(self, **kwargs):    return setup_databases(        self.verbosity, self.interactive, self.keepdb, self.debug_sql,        self.parallel, **kwargs    )       # 创建数据库...def get_unique_databases_and_mirrors():    """    Figure out which databases actually need to be created.    Deduplicate entries in DATABASES that correspond the same database or are    configured as test mirrors.    Return two values:    - test_databases: ordered mapping of signatures to (name, list of aliases)                      where all aliases share the same underlying database.    - mirrored_aliases: mapping of mirror aliases to original aliases.    """    mirrored_aliases = {}    test_databases = {}    dependencies = {}    default_sig = connections[DEFAULT_DB_ALIAS].creation.test_db_signature()   # 获取配置HOST PORT ENGINE 数据库名称    for alias in connections:        connection = connections[alias]        test_settings = connection.settings_dict['TEST']                        # 获取TEST配置信息        if test_settings['MIRROR']:                                             # 是否设置MIRROR            # If the database is marked as a test mirror, save the alias.            mirrored_aliases[alias] = test_settings['MIRROR']        else:            # Store a tuple with DB parameters that uniquely identify it.            # If we have two aliases with the same values for that tuple,            # we only need to create the test database once.            item = test_databases.setdefault(                connection.creation.test_db_signature(),                (connection.settings_dict['NAME'], set())            )                                                                   # 设置HOST 端口数据库等信息            item[1].add(alias)                                                  #             if 'DEPENDENCIES' in test_settings:                dependencies[alias] = test_settings['DEPENDENCIES']            else:                if alias != DEFAULT_DB_ALIAS and connection.creation.test_db_signature() != default_sig:                    dependencies[alias] = test_settings.get('DEPENDENCIES', [DEFAULT_DB_ALIAS])    test_databases = dependency_ordered(test_databases.items(), dependencies)    test_databases = collections.OrderedDict(test_databases)                    # 排序返回    return test_databases, mirrored_aliases...def setup_databases(verbosity, interactive, keepdb=False, debug_sql=False, parallel=0, **kwargs):    """    Creates the test databases.    """    test_databases, mirrored_aliases = get_unique_databases_and_mirrors()       # 获取数据 orderdict(('127.0.0.1','3306','django.db.backends.mysql','test_db'), ('test', {'default'}))    old_names = []    for signature, (db_name, aliases) in test_databases.items():                # 遍历        first_alias = None        for alias in aliases:                                                   # 获取连接信息            connection = connections[alias]            old_names.append((connection, db_name, first_alias is None))            # Actually create the database for the first connection            if first_alias is None:                                             # 第一次创建test                first_alias = alias                connection.creation.create_test_db(                    verbosity=verbosity,                    autoclobber=not interactive,                    keepdb=keepdb,                    serialize=connection.settings_dict.get("TEST", {}).get("SERIALIZE", True),                )                                                               # 创建test数据库                if parallel > 1:                    for index in range(parallel):                        connection.creation.clone_test_db(                            number=index + 1,                            verbosity=verbosity,                            keepdb=keepdb,                        )            # Configure all other connections as mirrors of the first one            else:                connections[alias].creation.set_as_test_mirror(                    connections[first_alias].settings_dict)    # Configure the test mirrors.    for alias, mirror_alias in mirrored_aliases.items():        connections[alias].creation.set_as_test_mirror(            connections[mirror_alias].settings_dict)    if debug_sql:                                                               # 是否调试sql        for alias in connections:            connections[alias].force_debug_cursor = True                        # 调试    return old_names                                                            # 返回

其中,调用了创建方法;

connection.creation.create_test_db(        verbosity=verbosity,        autoclobber=not interactive,        keepdb=keepdb,        serialize=connection.settings_dict.get("TEST", {}).get("SERIALIZE", True),    )

主要是创建数据库,创建测试数据库相关信息;

def create_test_db(self, verbosity=1, autoclobber=False, serialize=True, keepdb=False):    """    Creates a test database, prompting the user for confirmation if the    database already exists. Returns the name of the test database created.    """    # Don't import django.core.management if it isn't needed.    from django.core.management import call_command    test_database_name = self._get_test_db_name()                               # 获取测试数据库名称    if verbosity >= 1:        action = 'Creating'        if keepdb:                                                              # 如果使用已存在的数据库            action = "Using existing"        print("%s test database for alias %s..." % (            action,            self._get_database_display_str(verbosity, test_database_name),        ))    # We could skip this call if keepdb is True, but we instead    # give it the keepdb param. This is to handle the case    # where the test DB doesn't exist, in which case we need to    # create it, then just not destroy it. If we instead skip    # this, we will get an exception.    self._create_test_db(verbosity, autoclobber, keepdb)                        # 新建数据库    self.connection.close()                                                     # 关闭连接    settings.DATABASES[self.connection.alias]["NAME"] = test_database_name    self.connection.settings_dict["NAME"] = test_database_name                  # 设置数据库名称    # We report migrate messages at one level lower than that requested.    # This ensures we don't get flooded with messages during testing    # (unless you really ask to be flooded).    call_command(        'migrate',        verbosity=max(verbosity - 1, 0),        interactive=False,        database=self.connection.alias,        run_syncdb=True,    )                                                                           # 通过命令行命令调用Migrate生成数据库     # We then serialize the current state of the database into a string    # and store it on the connection. This slightly horrific process is so people    # who are testing on databases without transactions or who are using    # a TransactionTestCase still get a clean database on every test run.    if serialize:        self.connection._test_serialized_contents = self.serialize_db_to_string()    call_command('createcachetable', database=self.connection.alias)            # 创建缓存表    # Ensure a connection for the side effect of initializing the test database.    self.connection.ensure_connection()                                         # 确保新连接    return test_database_name                                                   # 返回数据名
执行tests
result = self.run_suite(suite)

该函数主要就是运行找到的tests,执行流程如下;

def run_suite(self, suite, **kwargs):    resultclass = self.get_resultclass()        # 设置resultclass    return self.test_runner(        verbosity=self.verbosity,        failfast=self.failfast,        resultclass=resultclass,    ).run(suite)                                # 运行找到的tests

主要是调用了unittest.TextTestRunner的run方法,将找到的sutie作为参数传入;

def run(self, test):    "Run the given test case or test suite."    result = self._makeResult()                                     # 初始化result    registerResult(result)      result.failfast = self.failfast    result.buffer = self.buffer    result.tb_locals = self.tb_locals    with warnings.catch_warnings():                                 # 捕获相关警告信息        if self.warnings:            # if self.warnings is set, use it to filter all the warnings            warnings.simplefilter(self.warnings)            # if the filter is 'default' or 'always', special-case the            # warnings from the deprecated unittest methods to show them            # no more than once per module, because they can be fairly            # noisy.  The -Wd and -Wa flags can be used to bypass this            # only when self.warnings is None.            if self.warnings in ['default', 'always']:                warnings.filterwarnings('module',                        category=DeprecationWarning,                        message='Please use assert\w+ instead.')        startTime = time.time()                                     # 获取当前时间        startTestRun = getattr(result, 'startTestRun', None)        # 获取是否有startTestRun属性        if startTestRun is not None:            startTestRun()                                          # 有就运行        try:            test(result)                                            # 调用test的__call__方法将result传入        finally:            stopTestRun = getattr(result, 'stopTestRun', None)      # 获取是否有stopTestRun方法            if stopTestRun is not None:                stopTestRun()        stopTime = time.time()                                      # 获取结束时间    timeTaken = stopTime - startTime                                # 获取运行的总时间    result.printErrors()                                            # 处理相关运行的信息状态    ...                 # 相关运行结果的显示处理

此时就是调用了传入的suite的call方法,由于suite类继承自BaseTestSuite方法就是调用了该类的call方法,

def __call__(self, *args, **kwds):    return self.run(*args, **kwds)

就是调用了run方法,此时的suite类是unittest.TestSuite,该类的run方法;

def run(self, result, debug=False):    topLevel = False    if getattr(result, '_testRunEntered', False) is False:        result._testRunEntered = topLevel = True    for index, test in enumerate(self):                             # 访问self.__iter__方法        if result.shouldStop:            break        if _isnotsuite(test):            self._tearDownPreviousClass(test, result)            self._handleModuleFixture(test, result)            self._handleClassSetUp(test, result)            result._previousTestClass = test.__class__            if (getattr(test.__class__, '_classSetupFailed', False) or                getattr(result, '_moduleSetUpFailed', False)):                continue        if not debug:                                               # 如果debug为False则直接调用test的call方法            test(result)        else:            test.debug()        if self._cleanup:            self._removeTestAtIndex(index)    if topLevel:        self._tearDownPreviousClass(None, result)        self._handleModuleTearDown(result)        result._testRunEntered = False    return result

此时的self.iter方法对应如下;

def __iter__(self):    return iter(self._tests)

所以,test(result)执行的就是test的call方法,此时的test都是TestCase类的实例;此时调用了基类的SimpleTestCase的call方法;

def __call__(self, result=None):    """    Wrapper around default __call__ method to perform common Django test    set up. This means that user-defined Test Cases aren't required to    include a call to super().setUp().    """    testMethod = getattr(self, self._testMethodName)    skipped = (        getattr(self.__class__, "__unittest_skip__", False) or        getattr(testMethod, "__unittest_skip__", False)    )    if not skipped:        try:            self._pre_setup()                                   # 调用前期准备方法        except Exception:            result.addError(self, sys.exc_info())            return    super(SimpleTestCase, self).__call__(result)                # 调用父类__call__方法    if not skipped:        try:            self._post_teardown()                               # 如果不跳过则调用该接受清理数据        except Exception:            result.addError(self, sys.exc_info())            return

在调用了self._post_teardown()方法,其中就有在执行完成后清理数据库的方法,

def _fixture_teardown(self):    # Allow TRUNCATE ... CASCADE and don't emit the post_migrate signal    # when flushing only a subset of the apps    for db_name in self._databases_names(include_mirrors=False):        # Flush the database        inhibit_post_migrate = (            self.available_apps is not None or            (   # Inhibit the post_migrate signal when using serialized                # rollback to avoid trying to recreate the serialized data.                self.serialized_rollback and                hasattr(connections[db_name], '_test_serialized_contents')            )        )        call_command('flush', verbosity=0, interactive=False,                     database=db_name, reset_sequences=False,                     allow_cascade=self.available_apps is not None,                     inhibit_post_migrate=inhibit_post_migrate)         # 调用flush命令清理数据库数据

这就是在test执行完成后,test数据库还存在的话里面的数据都为空的原因,继续查看基类的TestCase基类的call方法,

def __call__(self, *args, **kwds):    return self.run(*args, **kwds)

此时调用了TestSuite的run方法;

def run(self, result=None):    orig_result = result    if result is None:                                                      # 如果传入的result为空        result = self.defaultTestResult()                                   # 实例化一个默认的result.TestResult()实例        startTestRun = getattr(result, 'startTestRun', None)                # 获取该实例的startTestRun属性        if startTestRun is not None:                                        # 如果不为空则执行            startTestRun()    result.startTest(self)    testMethod = getattr(self, self._testMethodName)                        # 获取自己的测试方法,test_list    if (getattr(self.__class__, "__unittest_skip__", False) or              # 如果测试类上有跳过        getattr(testMethod, "__unittest_skip__", False)):                   # 如果测试方法上有则跳过        # If the class or method was skipped.        try:            skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')                        or getattr(testMethod, '__unittest_skip_why__', ''))            self._addSkip(result, self, skip_why)        finally:            result.stopTest(self)                                           # 停止并返回        return    expecting_failure_method = getattr(testMethod,                                       "__unittest_expecting_failure__", False)     # 获取失败方法    expecting_failure_class = getattr(self,                                      "__unittest_expecting_failure__", False)      # 获取失败类    expecting_failure = expecting_failure_class or expecting_failure_method    outcome = _Outcome(result)    try:        self._outcome = outcome        with outcome.testPartExecutor(self):            self.setUp()                                                            # 调用setup方法        if outcome.success:                                                         # 如果成功            outcome.expecting_failure = expecting_failure            with outcome.testPartExecutor(self, isTest=True):                testMethod()                                                        # 调用测试的方法并执行            outcome.expecting_failure = False            with outcome.testPartExecutor(self):                self.tearDown()                                                     # 最后调用tearDown方法        self.doCleanups()                                                           # 清理        for test, reason in outcome.skipped:            self._addSkip(result, test, reason)        self._feedErrorsToResult(result, outcome.errors)        if outcome.success:            if expecting_failure:                if outcome.expectedFailure:                    self._addExpectedFailure(result, outcome.expectedFailure)                else:                    self._addUnexpectedSuccess(result)            else:                result.addSuccess(self)                                                     return result                                                               # 返回结果    finally:        result.stopTest(self)                                                       # 执行完成后进行清理工作        if orig_result is None:            stopTestRun = getattr(result, 'stopTestRun', None)            if stopTestRun is not None:                stopTestRun()        # explicitly break reference cycles:        # outcome.errors -> frame -> outcome -> outcome.errors        # outcome.expectedFailure -> frame -> outcome -> outcome.expectedFailure        outcome.errors.clear()        outcome.expectedFailure = None        # clear the outcome, no more needed        self._outcome = None

至此,一个test_list测试方法就算执行完成,其中的其他细节大家可以自行阅读。

测试执行完成清理数据库和环境
self.teardown_databases(old_config)    self.teardown_test_environment()

其中主要就是根据参数值判断是否需要删除创建的测试数据库;

def teardown_databases(self, old_config, **kwargs):    """    Destroys all the non-mirror databases.    """    for connection, old_name, destroy in old_config:        if destroy:            if self.parallel > 1:                for index in range(self.parallel):                    connection.creation.destroy_test_db(                        number=index + 1,                        verbosity=self.verbosity,                        keepdb=self.keepdb,                    )            connection.creation.destroy_test_db(old_name, self.verbosity, self.keepdb)  # 删除数据库...def teardown_test_environment(self, **kwargs):    unittest.removeHandler()                # 移除信号的handler    teardown_test_environment()             # 撤销环境信息

至此整个测试就执行完成,所有的条件都恢复成原来的值。

总结

作为Django项目中比较重要的test的过程,大致如上所述,先建立测试环境的数据,然后找到项目中的所有的测试用例,然后建立测试数据库并执行测试用例,然后测试用例运行完成后清理数据库数据,最后恢复删除掉配置的测试环境。大致流程如上所述,详细的细节大家可自行阅读,本文难免有疏漏,请批评指正。

转载地址:https://blog.csdn.net/qq_33339479/article/details/82116376 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Django源码分析9:model.py表结构的初始化概述
下一篇:Django源码分析7:migrate命令的浅析

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年03月30日 16时44分34秒