Django源码分析10:makemigrations命令概述
发布日期:2021-07-25 13:04:47 浏览次数:9 分类:技术文章

本文共 29459 字,大约阅读时间需要 98 分钟。

django源码分析

本文环境python3.5.2,django1.10.x系列

django源码分析-makemigrations命令概述

Django项目中的数据库管理命令就是通过makemigrations来实现的,通过调用该命令可以对Django中的app的model表进行改动后生成相关连的migrations文件,然后通过调用migrate命令执行migrations中对数据库中操作。本次分析的过程基于上文的celery_django中的celery_app中的model文件中的表结构,makemigrations的大概的思路是:根据现有的migrations文件,来判断哪些数据库和数据库对应的字段已经执行过了,然后通过导入每个app中的model的表结构,依次对比来判断是否有表的更改或者字段的变更或者新建表等操作,通过对比结果然后生成migrations文件,此时通过migrate命令就可以达到更改数据库的目的。

大致的流程如上所述,本文就举其中最简单的生成celery_app中的第一个migrations文件为例。

makemigrations的基本执行流程

首先查看对应命令的handle处理过程如下;

def handle(self, *app_labels, **options):        self.verbosity = options['verbosity']                               # 配置信息        self.interactive = options['interactive']                           # 是否需要用户终端输入确认        self.dry_run = options['dry_run']           self.merge = options['merge']                                       # 是否是merge冲突        self.empty = options['empty']                                       # 是否是生成empty的        self.migration_name = options['name']                               # 生成的migration的文件名称        self.exit_code = options['exit_code']                               # 退出码        check_changes = options['check_changes']                            # 检查变化        if self.exit_code:             warnings.warn(                "The --exit option is deprecated in favor of the --check option.",                RemovedInDjango20Warning            )        # Make sure the app they asked for exists        app_labels = set(app_labels)                                        # 获取app的标签        bad_app_labels = set()                                              # bad app集合        for app_label in app_labels:                                        # 遍历app列表            try:                apps.get_app_config(app_label)                              # 尝试获取app配置            except LookupError:                bad_app_labels.add(app_label)                               # 如果获取不到则添加到bad中        if bad_app_labels:                                                  # 如果有Bad则打印出信息后退出            for app_label in bad_app_labels:                self.stderr.write("App '%s' could not be found. Is it in INSTALLED_APPS?" % app_label)            sys.exit(2)        # Load the current graph state. Pass in None for the connection so        # the loader doesn't try to resolve replaced migrations from DB.        loader = MigrationLoader(None, ignore_no_migrations=True)           # 初始化一个loader        # Raise an error if any migrations are applied before their dependencies.        consistency_check_labels = set(config.label for config in apps.get_app_configs())   # 获取所有需要检查的app标签        # Non-default databases are only checked if database routers used.        aliases_to_check = connections if settings.DATABASE_ROUTERS else [DEFAULT_DB_ALIAS]         # 获取检查的数据库        for alias in sorted(aliases_to_check):            connection = connections[alias]            if (connection.settings_dict['ENGINE'] != 'django.db.backends.dummy' and                    # At least one app must be migrated to the database.                    any(router.allow_migrate(connection.alias, label) for label in consistency_check_labels)):                loader.check_consistent_history(connection)                         # 检查数据库中已经执行过的app的migrations        # Before anything else, see if there's conflicting apps and drop out        # hard if there are any and they don't want to merge        conflicts = loader.detect_conflicts()                                       # 检查是否有冲突        # If app_labels is specified, filter out conflicting migrations for unspecified apps        if app_labels:            conflicts = {                app_label: conflict for app_label, conflict in iteritems(conflicts)                if app_label in app_labels            }                                                                       # 查出冲突的app        if conflicts and not self.merge:                                            # 如果有冲突并且没有传入merge参数 则打印信息后报错            name_str = "; ".join(                "%s in %s" % (", ".join(names), app)                for app, names in conflicts.items()            )            raise CommandError(                "Conflicting migrations detected; multiple leaf nodes in the "                "migration graph: (%s).\nTo fix them run "                "'python manage.py makemigrations --merge'" % name_str            )        # If they want to merge and there's nothing to merge, then politely exit        if self.merge and not conflicts:                                            # 如果传入merge参数但是没有冲突则打印信息返回            self.stdout.write("No conflicts detected to merge.")            return        # If they want to merge and there is something to merge, then        # divert into the merge code        if self.merge and conflicts:            return self.handle_merge(loader, conflicts)                             # 传入merge并有冲突则解决冲突后返回        if self.interactive:                                                        # 是否需要用户输入确认信息            questioner = InteractiveMigrationQuestioner(specified_apps=app_labels, dry_run=self.dry_run)        else:            questioner = NonInteractiveMigrationQuestioner(specified_apps=app_labels, dry_run=self.dry_run)        # Set up autodetector        autodetector = MigrationAutodetector(            loader.project_state(),            ProjectState.from_apps(apps),            questioner,        )                                                                           # 初始化生成migraitons类,传入的参数就是从migraiotn加载的旧表内容,并再传入通过model重新加载的新表内容,通过对比生成migrations        # If they want to make an empty migration, make one for each app        if self.empty:                                                              # 是否生成空的内容            if not app_labels:                raise CommandError("You must supply at least one app label when using --empty.")            # Make a fake changes() result we can pass to arrange_for_graph            changes = {                app: [Migration("custom", app)]                for app in app_labels            }            changes = autodetector.arrange_for_graph(                changes=changes,                graph=loader.graph,                migration_name=self.migration_name,            )            self.write_migration_files(changes)            return        # Detect changes        changes = autodetector.changes(            graph=loader.graph,            trim_to_apps=app_labels or None,            convert_apps=app_labels or None,            migration_name=self.migration_name,        )                                                                   # 通过比较新表和旧表的字段来获取changs        if not changes:                                                     # 没有改变则打印相关信息后退出            # No changes? Tell them.            if self.verbosity >= 1:                if len(app_labels) == 1:                    self.stdout.write("No changes detected in app '%s'" % app_labels.pop())                elif len(app_labels) > 1:                    self.stdout.write("No changes detected in apps '%s'" % ("', '".join(app_labels)))                else:                    self.stdout.write("No changes detected")            if self.exit_code:                sys.exit(1)        else:            self.write_migration_files(changes)                             # 将changes写入到migraitons文件中            if check_changes:                sys.exit(1)

主要分析下,生成changes的过程即如下代码的执行过程;

# Detect changes    changes = autodetector.changes(        graph=loader.graph,        trim_to_apps=app_labels or None,        convert_apps=app_labels or None,        migration_name=self.migration_name,    )

该方法如下;

def changes(self, graph, trim_to_apps=None, convert_apps=None, migration_name=None):    """    Main entry point to produce a list of applicable changes.    Takes a graph to base names on and an optional set of apps    to try and restrict to (restriction is not guaranteed)    """    changes = self._detect_changes(convert_apps, graph)                     # 生成改变信息    changes = self.arrange_for_graph(changes, graph, migration_name)            if trim_to_apps:        changes = self._trim_to_apps(changes, trim_to_apps)    return changes

主要先调用了对比生成新的changes,然后再对changes进行依赖等信息进行调整,首先查看_detect_changes方法;

def _detect_changes(self, convert_apps=None, graph=None):    """    Returns a dict of migration plans which will achieve the    change from from_state to to_state. The dict has app labels    as keys and a list of migrations as values.    The resulting migrations aren't specially named, but the names    do matter for dependencies inside the set.    convert_apps is the list of apps to convert to use migrations    (i.e. to make initial migrations for, in the usual case)    graph is an optional argument that, if provided, can help improve    dependency generation and avoid potential circular dependencies.    """    # The first phase is generating all the operations for each app    # and gathering them into a big per-app list.    # We'll then go through that list later and order it and split    # into migrations to resolve dependencies caused by M2Ms and FKs.    self.generated_operations = {}    # Prepare some old/new state and model lists, separating    # proxy models and ignoring unmigrated apps.    self.old_apps = self.from_state.concrete_apps                   # 获取migrations中apps    self.new_apps = self.to_state.apps                              # 获取现在的apps    self.old_model_keys = []                                        # 旧model的key    self.old_proxy_keys = []                                        # 旧model代理的key    self.old_unmanaged_keys = []                                        self.new_model_keys = []    self.new_proxy_keys = []    self.new_unmanaged_keys = []    for al, mn in sorted(self.from_state.models.keys()):            # 遍历旧models        model = self.old_apps.get_model(al, mn)                     # 获取该model        if not model._meta.managed:                                 # 是否是managed默认为True            self.old_unmanaged_keys.append((al, mn))        elif al not in self.from_state.real_apps:                   # 判断al是否在real_apps中            if model._meta.proxy:                                   # 如果是代理                self.old_proxy_keys.append((al, mn))                # 添加到代理中            else:                self.old_model_keys.append((al, mn))                # 添加到旧modelkey中    for al, mn in sorted(self.to_state.models.keys()):              # 遍历新Models        model = self.new_apps.get_model(al, mn)        if not model._meta.managed:            self.new_unmanaged_keys.append((al, mn))        elif (            al not in self.from_state.real_apps or            (convert_apps and al in convert_apps)        ):                                                          # 如果不在real_appas中并且不在al在convert_apps中            if model._meta.proxy:                self.new_proxy_keys.append((al, mn))            else:                self.new_model_keys.append((al, mn))    # Renames have to come first    self.generate_renamed_models()                                  # 检查是否有重命名model    # Prepare lists of fields and generate through model map    self._prepare_field_lists()                                     # 先获取所有Model的field_lists    self._generate_through_model_map()                              # 检查是否有通过through的model    # Generate non-rename model operations    self.generate_deleted_models()                                  # 检查是否有删除的model    self.generate_created_models()                                  # 检查是否有新建的model    self.generate_deleted_proxies()                                 # 检查是否有删除的代理model    self.generate_created_proxies()                                 # 检查是否有新建的代理model    self.generate_altered_options()                                     self.generate_altered_managers()                                # 检查是否有更换manager的model    # Generate field operations    self.generate_renamed_fields()                                  # 检查是否有重命名的字段    self.generate_removed_fields()                                  # 检查是否有要删除的字段    self.generate_added_fields()                                    # 检查是否有要添加的字段    self.generate_altered_fields()                                  # 检查是否有更改的字段    self.generate_altered_unique_together()                         # 是否需要更改unique    self.generate_altered_index_together()                          # 检查是否更改index    self.generate_altered_db_table()                                # 检查是否有更改表明的    self.generate_altered_order_with_respect_to()       self._sort_migrations()                                         # 排序    self._build_migration_list(graph)                               # 通过依赖添加到graph中    self._optimize_migrations()                                     # 优化    return self.migrations                                          # 返回migraiotns

该方法就是对比所有变更的处理函数,其中获取所有旧表和新表的字段处理函数如下;

def _prepare_field_lists(self):    """    Prepare field lists, and prepare a list of the fields that used    through models in the old state so we can make dependencies    from the through model deletion to the field that uses it.    """    self.kept_model_keys = set(self.old_model_keys).intersection(self.new_model_keys)           # 获取old_model_keys和new_model_keys的交集    self.kept_proxy_keys = set(self.old_proxy_keys).intersection(self.new_proxy_keys)           # 获取old_proxy_keys和new_proxy_keys的交集    self.kept_unmanaged_keys = set(self.old_unmanaged_keys).intersection(self.new_unmanaged_keys)    self.through_users = {}    self.old_field_keys = set()    self.new_field_keys = set()    for app_label, model_name in sorted(self.kept_model_keys):                                  # 遍历        old_model_name = self.renamed_models.get((app_label, model_name), model_name)           # 从更名Model中获取旧的model名        old_model_state = self.from_state.models[app_label, old_model_name]                     # 获取旧的state        new_model_state = self.to_state.models[app_label, model_name]                           # 获取新的state        self.old_field_keys.update((app_label, model_name, x) for x, y in old_model_state.fields)       # 遍历旧表的所有字段 并添加到old_field_keys中 为后续比较做准备        self.new_field_keys.update((app_label, model_name, x) for x, y in new_model_state.fields)       # 从新表中获取所有的字段 并添加到new_field_keys中

该函数就是将旧表和新表的所有的字段进行了添加到字段中,方便后续比较是否有字段新增或者变更等,当字段加载完成后,举例以新建Model为例;

def generate_created_models(self):    """    Find all new models (both managed and unmanaged) and make create    operations for them as well as separate operations to create any    foreign key or M2M relationships (we'll optimize these back in later    if we can).    We also defer any model options that refer to collections of fields    that might be deferred (e.g. unique_together, index_together).    """    old_keys = set(self.old_model_keys).union(self.old_unmanaged_keys)          # 获取所有的旧model的keys    added_models = set(self.new_model_keys) - old_keys                          # 获取需要添加的Models名称    added_unmanaged_models = set(self.new_unmanaged_keys) - old_keys            # 获取需要添加的unmanaged的model的key    all_added_models = chain(        sorted(added_models, key=self.swappable_first_key, reverse=True),        sorted(added_unmanaged_models, key=self.swappable_first_key, reverse=True)    )                                                                           # 获取排序后所有需要添加的models    for app_label, model_name in all_added_models:                              # 依次遍历所有的Models        model_state = self.to_state.models[app_label, model_name]               # 获取model_state实例        model_opts = self.new_apps.get_model(app_label, model_name)._meta       # 获取对应model实例的_meta属性        # Gather related fields        related_fields = {}                                                     primary_key_rel = None        for field in model_opts.local_fields:                                   # 获取所有的local_fields            if field.remote_field:                                              # 是否是remote_field类型                if field.remote_field.model:                                    # 获取对应的model                    if field.primary_key:                                       # 如果是主键                        primary_key_rel = field.remote_field.model              # 设置主键                    elif not field.remote_field.parent_link:                    # 如果不在parent_link中                        related_fields[field.name] = field                      # 设置关联字段的字段                # through will be none on M2Ms on swapped-out models;                # we can treat lack of through as auto_created=True, though.                if (getattr(field.remote_field, "through", None) and                              not field.remote_field.through._meta.auto_created):     # 如果有through 或者自增                    related_fields[field.name] = field                          # 添加到关联字段中        for field in model_opts.local_many_to_many:                             # 遍历多对多字段            if field.remote_field.model:                                        # 如果是remote_field                related_fields[field.name] = field                              # 设置到关联字典中            if getattr(field.remote_field, "through", None) and not field.remote_field.through._meta.auto_created:                related_fields[field.name] = field        # Are there unique/index_together to defer?        unique_together = model_state.options.pop('unique_together', None)      # 获取unique_together属性内容        index_together = model_state.options.pop('index_together', None)        # 获取index_together内容        order_with_respect_to = model_state.options.pop('order_with_respect_to', None)        # Depend on the deletion of any possible proxy version of us        dependencies = [            (app_label, model_name, None, False),                               # 创建依赖属性值        ]        # Depend on all bases        for base in model_state.bases:                                          # 依次访问父类依赖            if isinstance(base, six.string_types) and "." in base:              #                 base_app_label, base_name = base.split(".", 1)                dependencies.append((base_app_label, base_name, None, True))    # 依次添加父类依赖信息        # Depend on the other end of the primary key if it's a relation        if primary_key_rel:            dependencies.append((                primary_key_rel._meta.app_label,                primary_key_rel._meta.object_name,                None,                True            ))        # Generate creation operation        self.add_operation(            app_label,            operations.CreateModel(                name=model_state.name,                fields=[d for d in model_state.fields if d[0] not in related_fields],                options=model_state.options,                bases=model_state.bases,                managers=model_state.managers,            ),            dependencies=dependencies,            beginning=True,        )                                                                       # 添加生成数据表的操作,并添加依赖        # Don't add operations which modify the database for unmanaged models        if not model_opts.managed:                                              # 如果是managed为false则下一个            continue        # Generate operations for each related field        for name, field in sorted(related_fields.items()):                      # 处理关联字段值            dependencies = self._get_dependecies_for_foreign_key(field)         # 获取依赖外键            # Depend on our own model being created            dependencies.append((app_label, model_name, None, True))            # 添加到依赖中            # Make operation            self.add_operation(                app_label,                operations.AddField(                    model_name=model_name,                    name=name,                    field=field,                ),                dependencies=list(set(dependencies)),            )                                                                   # 新增依赖字段值        # Generate other opns        related_dependencies = [            (app_label, model_name, name, True)            for name, field in sorted(related_fields.items())        ]                                                                               related_dependencies.append((app_label, model_name, None, True))        if unique_together:                                                     # 如果设置了unique_together            self.add_operation(                app_label,                operations.AlterUniqueTogether(                    name=model_name,                    unique_together=unique_together,                ),                dependencies=related_dependencies            )                                                                   # 添加unique_together字段配置        if index_together:                                                      # 如果有索引配置信息则配置            self.add_operation(                app_label,                operations.AlterIndexTogether(                    name=model_name,                    index_together=index_together,                ),                dependencies=related_dependencies            )        if order_with_respect_to:            self.add_operation(                app_label,                operations.AlterOrderWithRespectTo(                    name=model_name,                    order_with_respect_to=order_with_respect_to,                ),                dependencies=[                    (app_label, model_name, order_with_respect_to, True),                    (app_label, model_name, None, True),                ]            )

当生成的操作都是通过add_operation函数,添加到generated_operations列表中;

def add_operation(self, app_label, operation, dependencies=None, beginning=False):    # Dependencies are (app_label, model_name, field_name, create/delete as True/False)    operation._auto_deps = dependencies or []    if beginning:        self.generated_operations.setdefault(app_label, []).insert(0, operation)    else:        self.generated_operations.setdefault(app_label, []).append(operation)

当处理完成依赖关系后,通过makemigraiotns.py中的Command的write_migration_files方法将对应的更改内容写入到文件中;

def write_migration_files(self, changes):    """    Takes a changes dict and writes them out as migration files.    """    directory_created = {}    for app_label, app_migrations in changes.items():                           # 获取每一个改变        if self.verbosity >= 1:                                                 # 如果输出格式大于等于1则打印相关信息            self.stdout.write(self.style.MIGRATE_HEADING("Migrations for '%s':" % app_label) + "\n")        for migration in app_migrations:                                        # 获取所有的migration            # Describe the migration            writer = MigrationWriter(migration)                                 # 初始化一个writer            if self.verbosity >= 1:                                             # 如果大于等于1则打印需要写入文件中的信息                # Display a relative path if it's below the current working                # directory, or an absolute path otherwise.                migration_string = os.path.relpath(writer.path)                if migration_string.startswith('..'):                    migration_string = writer.path                self.stdout.write("  %s:\n" % (self.style.MIGRATE_LABEL(migration_string),))                for operation in migration.operations:                    self.stdout.write("    - %s\n" % operation.describe())            if not self.dry_run:                # Write the migrations file to the disk.                migrations_directory = os.path.dirname(writer.path)             # 判断文件夹名称                if not directory_created.get(app_label):                        # 文件是否能够获取到                    if not os.path.isdir(migrations_directory):                 # 如果不是文件夹则创建一个                        os.mkdir(migrations_directory)                    init_path = os.path.join(migrations_directory, "__init__.py")   # 添加__init__.py文件                    if not os.path.isfile(init_path):                               # 如果不是文件                        open(init_path, "w").close()                                # 打开一下                    # We just do this once per app                    directory_created[app_label] = True                             # 是否创建了设置为True                migration_string = writer.as_string()                               # 转换成string                with open(writer.path, "wb") as fh:                                 # 打开对应的文件                    fh.write(migration_string)                                      # 写入migraion_string            elif self.verbosity == 3:                # Alternatively, makemigrations --dry-run --verbosity 3                # will output the migrations to stdout rather than saving                # the file to the disk.                self.stdout.write(self.style.MIGRATE_HEADING(                    "Full migrations file '%s':" % writer.filename) + "\n"                )                self.stdout.write("%s\n" % writer.as_string())

其中主要是将需要写入的migrations实例化了一个MigrationWriter,

并最后调用as_string方法生成渲染的字符数据;

def as_string(self):    """    Returns a string of the file contents.    """    items = {        "replaces_str": "",        "initial_str": "",    }    imports = set()    # Deconstruct operations    operations = []    for operation in self.migration.operations:        operation_string, operation_imports = OperationWriter(operation).serialize()            # 序列化        imports.update(operation_imports)                                                       # 更新对应的内容        operations.append(operation_string)                                                     # 添加到operations中    items["operations"] = "\n".join(operations) + "\n" if operations else ""                    # 添加信息    # Format dependencies and write out swappable dependencies right    dependencies = []    for dependency in self.migration.dependencies:                                              # 获取依赖信息        if dependency[0] == "__setting__":                                                      # 如果第一个为__setting__            dependencies.append("        migrations.swappable_dependency(settings.%s)," % dependency[1])    # 添加依赖信息            imports.add("from django.conf import settings")                                     # 添加导入信息        else:            # No need to output bytestrings for dependencies            dependency = tuple(force_text(s) for s in dependency)            dependencies.append("        %s," % self.serialize(dependency)[0])    items["dependencies"] = "\n".join(dependencies) + "\n" if dependencies else ""              # 添加依赖信息    # Format imports nicely, swapping imports of functions from migration files    # for comments    migration_imports = set()    for line in list(imports):                                                                  # 获取所有的导入信息        if re.match("^import (.*)\.\d+[^\s]*$", line):                                             # 正则匹配            migration_imports.add(line.split("import")[1].strip())                              # 添加到migration_imports中            imports.remove(line)                                                                # 从imports中移除            self.needs_manual_porting = True    # django.db.migrations is always used, but models import may not be.    # If models import exists, merge it with migrations import.    if "from django.db import models" in imports:        imports.discard("from django.db import models")                                         # 移除该内容        imports.add("from django.db import migrations, models")                                 # 添加该内容    else:        imports.add("from django.db import migrations")    # Sort imports by the package / module to be imported (the part after    # "from" in "from ... import ..." or after "import" in "import ...").    sorted_imports = sorted(imports, key=lambda i: i.split()[1])                                # 导入排序    items["imports"] = "\n".join(sorted_imports) + "\n" if imports else ""                      # 安全导入    if migration_imports:        items["imports"] += (            "\n\n# Functions from the following migrations need manual "            "copying.\n# Move them and any dependencies into this file, "            "then update the\n# RunPython operations to refer to the local "            "versions:\n# %s"        ) % "\n# ".join(sorted(migration_imports))                                              # 添加导入信息    # If there's a replaces, make a string for it    if self.migration.replaces:        items['replaces_str'] = "\n    replaces = %s\n" % self.serialize(self.migration.replaces)[0]    # Hinting that goes into comment    items.update(        version=get_version(),        timestamp=now().strftime("%Y-%m-%d %H:%M"),    )                                                                                           # 更新版本信息    if self.migration.initial:        items['initial_str'] = "\n    initial = True\n"                                         # 设置initial    return (MIGRATION_TEMPLATE % items).encode("utf8")                                          # 通过MIGRATION_TEMPLATE序列化

其中MIGRATION_TEMPLATE内容如下;

MIGRATION_TEMPLATE = """\# -*- coding: utf-8 -*-# Generated by Django %(version)s on %(timestamp)sfrom __future__ import unicode_literals%(imports)sclass Migration(migrations.Migration):%(replaces_str)s%(initial_str)s    dependencies = [%(dependencies)s\    ]    operations = [%(operations)s\    ]"""

至此一个新建Model的大致流程分析完成,其中没有详细分析依赖的解决等细节内容,大家有兴趣可自行查看。

总结

本文只是简单的介绍了一下新建model的migrations文件的生成过程,并未讨论到更复杂的应用场景,如复杂的应用场景可根据上文的执行流程进行详细的分析,由于本人水平有限,并没有使用到数据库应用很复杂的场景,所以对数据库中一些的处理也缺乏认识,如有疏漏请批评指正。

转载地址:https://blog.csdn.net/qq_33339479/article/details/82291671 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:djangorestframework源码分析1:generics中的view执行流程
下一篇:Django源码分析9:model.py表结构的初始化概述

发表评论

最新留言

做的很好,不错不错
[***.243.131.199]2024年04月11日 07时08分24秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章