Django源码分析7:migrate命令的浅析
发布日期:2021-07-25 13:04:45 浏览次数:11 分类:技术文章

本文共 41143 字,大约阅读时间需要 137 分钟。

django源码分析

本文环境python3.5.2,django1.10.x系列

django源码分析-migrate命令分析

Django项目中提供了,通过migrations操作数据库的结构的命令-migrate,该命令可以将生成的migrations直接映射到数据库中,对相关数据库和表结构进行相关的修改,并且在本地调用runserver调试运行的时候,都会进行migrations的检查,检查是否还有相关的数据库migrations未被执行,此时先从这里说起。本文按照mysql案例进行说明,相关的数据库配置如下:

DATABASES = {    'default': {        'ENGINE': 'django.db.backends.mysql',        'NAME': 'project',        'USER': 'test',        'PASSWORD': 'test',        'HOST': '127.0.0.1',        'PORT': '3306',        "ATOMIC_REQUESTS": True,    }}

BaseCommand的check_migrations

由于django中支持的命令都是继承自BaseCommand类,该类中默认在执行的过程中,在调用execute函数时,

if self.requires_migrations_checks:            self.check_migrations()

其中,self.requires_migrations_checks默认为True,此时就会执行check_migrations函数,

def check_migrations(self):    """    Print a warning if the set of migrations on disk don't match the    migrations in the database.    """    from django.db.migrations.executor import MigrationExecutor    try:        executor = MigrationExecutor(connections[DEFAULT_DB_ALIAS])                # 实例化一个executor类    except ImproperlyConfigured:        # No databases are configured (or the dummy one)        return    except MigrationSchemaMissing:        self.stdout.write(self.style.NOTICE(            "\nNot checking migrations as it is not possible to access/create the django_migrations table."        ))        return    plan = executor.migration_plan(executor.loader.graph.leaf_nodes())             # 获取执行的plan    if plan:        apps_waiting_migration = sorted(set(migration.app_label for migration, backwards in plan))        self.stdout.write(            self.style.NOTICE(                "\nYou have %(unpplied_migration_count)s unapplied migration(s). "                "Your project may not work properly until you apply the "                "migrations for app(s): %(apps_waiting_migration)s." % {                    "unpplied_migration_count": len(plan),                    "apps_waiting_migration": ", ".join(apps_waiting_migration),                }            )                                                                       # 打印具体的信息        )

先查看connections的定义,DEFAULT_DB_ALIAS则默认为default

connections = ConnectionHandler()

此时,connections[DEFAULT_DB_ALIAS]调用了ConnectionHandler的getitem方法,

def __getitem__(self, alias):    if hasattr(self._connections, alias):               # 如果已经设置过该属性则直接获取        return getattr(self._connections, alias)    self.ensure_defaults(alias)                         # 获取配置文件中的数据库配置    self.prepare_test_settings(alias)                   # 获取test数据库的相关配置    db = self.databases[alias]                          # 获取对应alias的数据库配置    backend = load_backend(db['ENGINE'])                # 获取选用了哪种数据库连接    conn = backend.DatabaseWrapper(db, alias)           # 实例该连接         setattr(self._connections, alias, conn)             # 设置该属性    return conn

查看self.ensure_defaults函数,

def ensure_defaults(self, alias):    """    Puts the defaults into the settings dictionary for a given connection    where no settings is provided.    """    try:        conn = self.databases[alias]                                    # 获取配置文件中的信息    except KeyError:        raise ConnectionDoesNotExist("The connection %s doesn't exist" % alias)    conn.setdefault('ATOMIC_REQUESTS', False)                           # 是否每个请求开启一个事务    conn.setdefault('AUTOCOMMIT', True)                                 # 是否自动提交    conn.setdefault('ENGINE', 'django.db.backends.dummy')               # 数据库的连接模型        if conn['ENGINE'] == 'django.db.backends.' or not conn['ENGINE']:           conn['ENGINE'] = 'django.db.backends.dummy'    conn.setdefault('CONN_MAX_AGE', 0)                                  # 连接的最大时间    conn.setdefault('OPTIONS', {})                                      # 选项设置    conn.setdefault('TIME_ZONE', None)                                  # 时区设置    for setting in ['NAME', 'USER', 'PASSWORD', 'HOST', 'PORT']:        # 设置连接设置        conn.setdefault(setting, '')

继续查看self.prepare_test_settings函数处理,该函数主要是对使用test时,新建数据库的相关配置,如字符集的设置等;

def prepare_test_settings(self, alias):    """    Makes sure the test settings are available in the 'TEST' sub-dictionary.    """    try:        conn = self.databases[alias]    except KeyError:        raise ConnectionDoesNotExist("The connection %s doesn't exist" % alias)    test_settings = conn.setdefault('TEST', {})                     # 获取TEST配置信息    for key in ['CHARSET', 'COLLATION', 'NAME', 'MIRROR']:          # 获取TEST中的字符集名称等配置信息        test_settings.setdefault(key, None)

继续查看backend = load_backend(db[‘ENGINE’]),该行代码就是导入配置中的数据集连接,本文选用的是mysql;

def load_backend(backend_name):    """    Return a database backend's "base" module given a fully qualified database    backend name, or raise an error if it doesn't exist.    """    # This backend was renamed in Django 1.9.    if backend_name == 'django.db.backends.postgresql_psycopg2':                # 转换一下数据库类型        backend_name = 'django.db.backends.postgresql'    try:        return import_module('%s.base' % backend_name)                          # 导入选用的模型的base,本文选用的是django.db.backends.mysql    except ImportError as e_user:        # The database backend wasn't found. Display a helpful error message        # listing all possible (built-in) database backends.        ...

执行完成后的backend就是mysql.base模块,导入完成后就执行 conn = backend.DatabaseWrapper(db, alias),此时就是寻找mysql.base中的DatabaseWrapper类,此时返回的就是该实例conn。

然后查看MigrationExecutor类,传入的参数就是刚刚初始化完成的conn,

class MigrationExecutor(object):    """    End-to-end migration execution - loads migrations, and runs them    up or down to a specified set of targets.    """    def __init__(self, connection, progress_callback=None):        self.connection = connection                            # 数据库初始化实例        self.loader = MigrationLoader(self.connection)          # 初始化loader,建立migrations的图关系        self.recorder = MigrationRecorder(self.connection)      # 初始化recorder        self.progress_callback = progress_callback              # 回调函数

先查看MigrationLoader的初始化过程,

class MigrationLoader(object):    def __init__(self, connection, load=True, ignore_no_migrations=False):        self.connection = connection                                # 数据库实例        self.disk_migrations = None        self.applied_migrations = None        self.ignore_no_migrations = ignore_no_migrations        if load:                                                    self.build_graph()                                      # 默认建立graph

此时默认传入的load为True,则调用build_graph函数,

def build_graph(self):    """    Builds a migration dependency graph using both the disk and database.    You'll need to rebuild the graph if you apply migrations. This isn't    usually a problem as generally migration stuff runs in a one-shot process.    """    # Load disk data    self.load_disk()                                                # 从磁盘中加载migrations中的文件    # Load database data    if self.connection is None:                                     # 如果conn实例为空        self.applied_migrations = set()                             # 默认设置set集合    else:        recorder = MigrationRecorder(self.connection)               # 初始化recorder        self.applied_migrations = recorder.applied_migrations()     # 获取已经在django_migrations中的app的migraiton    # To start, populate the migration graph with nodes for ALL migrations    # and their dependencies. Also make note of replacing migrations at this step.    self.graph = MigrationGraph()                                   # 初始化一个图对象    self.replacements = {}    for key, migration in self.disk_migrations.items():             # 获取加载的每个值        self.graph.add_node(key, migration)                         # 添加到graph中        # Internal (aka same-app) dependencies.        self.add_internal_dependencies(key, migration)              # 调价该migration的依赖        # Replacing migrations.        if migration.replaces:                                      # 检查migration是否有replaces            self.replacements[key] = migration                      # 保存替换的内容    # Add external dependencies now that the internal ones have been resolved.    for key, migration in self.disk_migrations.items():                     self.add_external_dependencies(key, migration)              # 添加依赖    # Carry out replacements where possible.    for key, migration in self.replacements.items():                # 如果replacements有值        # Get applied status of each of this migration's replacement targets.        applied_statuses = [(target in self.applied_migrations) for target in migration.replaces]   # 检查replace中的target是否已经执行过        # Ensure the replacing migration is only marked as applied if all of        # its replacement targets are.        if all(applied_statuses):                                   # 如果都执行过了            self.applied_migrations.add(key)                        # 添加该key        else:            self.applied_migrations.discard(key)                    # 除去该key        # A replacing migration can be used if either all or none of its        # replacement targets have been applied.        if all(applied_statuses) or (not any(applied_statuses)):            self.graph.remove_replaced_nodes(key, migration.replaces)  # 去除该node        else:            # This replacing migration cannot be used because it is partially applied.            # Remove it from the graph and remap dependencies to it (#25945).            self.graph.remove_replacement_node(key, migration.replaces)      # Ensure the graph is consistent.    try:        self.graph.validate_consistency()                           # 检查是否有空的节点    except NodeNotFoundError as exc:        ...

该函数主要就是先加载存在的migraitons中的文件,然后查询在django_migrations表中的数据,建立相关的graph,
查看load_disk函数,该函数主要是将migrations中的文件加载,并初始化,

def load_disk(self):    """    Loads the migrations from all INSTALLED_APPS from disk.    """    self.disk_migrations = {}                                   self.unmigrated_apps = set()                                    # 没有migrate的apps集合    self.migrated_apps = set()                                      # 已经migrate的apps集合    for app_config in apps.get_app_configs():                       # 遍历配置加载的apps        # Get the migrations module directory        module_name = self.migrations_module(app_config.label)      # 获取module名称        if module_name is None:                                     # 如果migrations没有            self.unmigrated_apps.add(app_config.label)              # 添加到unmigrated_apps列表中            continue                                                # 继续下一个循环        was_loaded = module_name in sys.modules                     # 该module是否已经在sys.modules中        try:            module = import_module(module_name)                     # 导入app.migrations        except ImportError as e:            # I hate doing this, but I don't want to squash other import errors.            # Might be better to try a directory check directly.            if "No module named" in str(e) and MIGRATIONS_MODULE_NAME in str(e):                self.unmigrated_apps.add(app_config.label)          # 导入出错则添加后并继续下一个                continue            raise        else:            # PY3 will happily import empty dirs as namespaces.            if not hasattr(module, '__file__'):                self.unmigrated_apps.add(app_config.label)                continue            # Module is not a package (e.g. migrations.py).            if not hasattr(module, '__path__'):                self.unmigrated_apps.add(app_config.label)                continue            # Force a reload if it's already loaded (tests need this)            if was_loaded:                                          # 如果已经导入                six.moves.reload_module(module)                     # 重新导入        self.migrated_apps.add(app_config.label)                    # 添加到导入app列表中        directory = os.path.dirname(module.__file__)                # 获取模块的文件路径        # Scan for .py files          migration_names = set()                                     # 对应module下的所有migrations文件        for name in os.listdir(directory):                          # 遍历所有文件            if name.endswith(".py"):                                # 如果是.py结尾                import_name = name.rsplit(".", 1)[0]                # 获取除去.py的文件名                if import_name[0] not in "_.~":                                         migration_names.add(import_name)                # 添加到migrations_names中        # Load them        for migration_name in migration_names:                      # 遍历migration_names            migration_module = import_module("%s.%s" % (module_name, migration_name))   # 导入对应的migration文件            if not hasattr(migration_module, "Migration"):          # 如果没有Migration则报错                raise BadMigrationError(                    "Migration %s in app %s has no Migration class" % (migration_name, app_config.label)                )            self.disk_migrations[app_config.label, migration_name] = migration_module.Migration(                migration_name,                app_config.label,            )                                                       # 初始化module中的Migration并保存到disk_migrations中

load_disk函数就将migrations文件下的文件导入,并初始化了文件中的Migration类,并通过app名称和migration文件名称做key保存该实例。

执行完成后,通过MigrationRecorder类实例来获取已经被执行的migrations;

class MigrationRecorder(object):    """    Deals with storing migration records in the database.    Because this table is actually itself used for dealing with model    creation, it's the one thing we can't do normally via migrations.    We manually handle table creation/schema updating (using schema backend)    and then have a floating model to do queries with.    If a migration is unapplied its row is removed from the table. Having    a row in the table always means a migration is applied.    """    @python_2_unicode_compatible    class Migration(models.Model):                                  # django_migrations表        app = models.CharField(max_length=255)                      # app名称        name = models.CharField(max_length=255)                     # migration名称        applied = models.DateTimeField(default=now)                 # 应用的时间        class Meta:            apps = Apps()            app_label = "migrations"            db_table = "django_migrations"        def __str__(self):            return "Migration %s for %s" % (self.name, self.app)    def __init__(self, connection):        self.connection = connection    @property    def migration_qs(self):        return self.Migration.objects.using(self.connection.alias)    def ensure_schema(self):        """        Ensures the table exists and has the correct schema.        """        # If the table's there, that's fine - we've never changed its schema        # in the codebase.        if self.Migration._meta.db_table in self.connection.introspection.table_names(self.connection.cursor()):            # 如果该django_migrations表已经存在            return        # Make the table        try:            with self.connection.schema_editor() as editor:                editor.create_model(self.Migration)     # 创建该django_migrations表        except DatabaseError as exc:            raise MigrationSchemaMissing("Unable to create the django_migrations table (%s)" % exc)    def applied_migrations(self):        """        Returns a set of (app, name) of applied migrations.        """        self.ensure_schema()                                                            # 检查django_migrations表是否已经创建        return set(tuple(x) for x in self.migration_qs.values_list("app", "name"))      # 获取已经创建的migrations的app name值    ...

由此可知applied_migrations的值就是django_migrations表中的app name的值,由此可知已经被应用的列表。

至此MigrationExecutor大致的初始化流程就基本完成。

继续查看check_migrations函数执行,此时就会执行到;

plan = executor.migration_plan(executor.loader.graph.leaf_nodes())    if plan:        apps_waiting_migration = sorted(set(migration.app_label for migration, backwards in plan))

此时就会调用executor.migration_plan方法来检查是否有还没被执行的migrations,传入的参数executor.loader.graph.leaf_nodes就是所有的子节点的值,

def migration_plan(self, targets, clean_start=False):    """    Given a set of targets, returns a list of (Migration instance, backwards?).    """    plan = []    if clean_start:                                                     # 是否清空,默认不清空        applied = set()    else:        applied = set(self.loader.applied_migrations)                   # 获取loader的applied_migrations此时就是已经应用的migrations    for target in targets:                                              # 遍历每个节点        # If the target is (app_label, None), that means unmigrate everything        if target[1] is None:                                           # 如果app下的migration为空            for root in self.loader.graph.root_nodes():                 # 检查所有根节点                if root[0] == target[0]:                                # 如果找到该跟节点                    for migration in self.loader.graph.backwards_plan(root):    # 查找该root的migration                        if migration in applied:                                # 如果该migration已经执行                            plan.append((self.loader.graph.nodes[migration], True))  # plan添加需要执行                            applied.remove(migration)                                # 从已执行列表中删除该migration        # If the migration is already applied, do backwards mode,        # otherwise do forwards mode.        elif target in applied:                                                 # 遍历所有的applied            # Don't migrate backwards all the way to the target node (that            # may roll back dependencies in other apps that don't need to            # be rolled back); instead roll back through target's immediate            # child(ren) in the same app, and no further.            next_in_app = sorted(                n for n in                self.loader.graph.node_map[target].children                if n[0] == target[0]            )                                                                   # 病例node_map的children获取相同的migration并排序            for node in next_in_app:                                            # 遍历该列表                for migration in self.loader.graph.backwards_plan(node):        # 获取该node下的migration                    if migration in applied:                                    # 如果已经在applied中                        plan.append((self.loader.graph.nodes[migration], True))     # 添加到plan中去执行                        applied.remove(migration)                               # 从已执行列表中删除        else:            for migration in self.loader.graph.forwards_plan(target):           # 遍历migration                if migration not in applied:                                    # 如果不在applied中                    plan.append((self.loader.graph.nodes[migration], False))    # 添加到已执行列表中                    applied.add(migration)                                      # 添加到已经执行applied    return plan                                                                 # 返回plan

此时就获取了所有需要执行的plan,此时返回的plan就是需要执行的migration。如果返回的plan不为空则在标准输出中打印相关信息。

至此,通过命令执行时的check_migraitons函数基本分析完毕。

migrate命令分析

根据migrate命令的执行过程,可知,最终会调用migrate.py中的Command的handle方法执行其具体的业务处理,该函数的大致处理过程如下;

def handle(self, *args, **options):    self.verbosity = options['verbosity']    self.interactive = options['interactive']    # Import the 'management' module within each installed app, to register    # dispatcher events.    for app_config in apps.get_app_configs():                               # 导入app中的management的扩展命令        if module_has_submodule(app_config.module, "management"):            import_module('.management', app_config.name)                       # Get the database we're operating from    db = options['database']                                                # 获取需要操作的数据库    connection = connections[db]                                            # 获取选用数据库的实例    # Hook for backends needing any database preparation    connection.prepare_database()                                           # 调用该准备方法    # Work out which apps have migrations and which do not    executor = MigrationExecutor(connection, self.migration_progress_callback)      # 初始化一个MigrationExecutor实例    # Raise an error if any migrations are applied before their dependencies.    executor.loader.check_consistent_history(connection)                    # 检查是否有未被执行过的依赖migration    # Before anything else, see if there's conflicting apps and drop out    # hard if there are any    conflicts = executor.loader.detect_conflicts()                          # 检查是否有冲突    if conflicts:        name_str = "; ".join(            "%s in %s" % (", ".join(names), app)            for app, names in conflicts.items()        )                                                                   # 如果有冲突的migration则打印具体的信息        raise CommandError(            "Conflicting migrations detected; multiple leaf nodes in the "            "migration graph: (%s).\nTo fix them run "            "'python manage.py makemigrations --merge'" % name_str        )    # If they supplied command line arguments, work out what they mean.    target_app_labels_only = True                                           # 是否是对某个app进行migrate    if options['app_label'] and options['migration_name']:                  # 获取app和migration_name        app_label, migration_name = options['app_label'], options['migration_name']        if app_label not in executor.loader.migrated_apps:                  # 如果不在已经执行的migrated_apps中则报错            raise CommandError(                "App '%s' does not have migrations." % app_label            )        if migration_name == "zero":            targets = [(app_label, None)]        else:            try:                migration = executor.loader.get_migration_by_prefix(app_label, migration_name)      # 加载该app对应的migration_name            except AmbiguityError:                raise CommandError(                    "More than one migration matches '%s' in app '%s'. "                    "Please be more specific." %                    (migration_name, app_label)                )            except KeyError:                raise CommandError("Cannot find a migration matching '%s' from app '%s'." % (                    migration_name, app_label))            targets = [(app_label, migration.name)]                         # 设置targets        target_app_labels_only = False    elif options['app_label']:                                              # 如果是migrate某个app        app_label = options['app_label']        if app_label not in executor.loader.migrated_apps:                  # 检查是否在migrated_apps中            raise CommandError(                "App '%s' does not have migrations." % app_label            )        targets = [key for key in executor.loader.graph.leaf_nodes() if key[0] == app_label]    # 获取该app下所有的migration    else:        targets = executor.loader.graph.leaf_nodes()                        # 获取所有的nodes    plan = executor.migration_plan(targets)                                 # 获取所有待执行的plan    run_syncdb = options['run_syncdb'] and executor.loader.unmigrated_apps    # Print some useful info    if self.verbosity >= 1:                                                 # 根据参数打印相关信息        self.stdout.write(self.style.MIGRATE_HEADING("Operations to perform:"))        if run_syncdb:            self.stdout.write(                self.style.MIGRATE_LABEL("  Synchronize unmigrated apps: ") +                (", ".join(sorted(executor.loader.unmigrated_apps)))            )        if target_app_labels_only:            self.stdout.write(                self.style.MIGRATE_LABEL("  Apply all migrations: ") +                (", ".join(sorted(set(a for a, n in targets))) or "(none)")            )        else:            if targets[0][1] is None:                self.stdout.write(self.style.MIGRATE_LABEL(                    "  Unapply all migrations: ") + "%s" % (targets[0][0], )                )            else:                self.stdout.write(self.style.MIGRATE_LABEL(                    "  Target specific migration: ") + "%s, from %s"                    % (targets[0][1], targets[0][0])                )    pre_migrate_state = executor._create_project_state(with_applied_migrations=True)    # 创建一个projectstate对象    pre_migrate_apps = pre_migrate_state.apps                                           # 返回一个StateApps实例    emit_pre_migrate_signal(        self.verbosity, self.interactive, connection.alias, apps=pre_migrate_apps, plan=plan,    )    # Run the syncdb phase.    if run_syncdb:        if self.verbosity >= 1:            self.stdout.write(self.style.MIGRATE_HEADING("Synchronizing apps without migrations:"))        self.sync_apps(connection, executor.loader.unmigrated_apps)    # Migrate!    if self.verbosity >= 1:        self.stdout.write(self.style.MIGRATE_HEADING("Running migrations:"))    if not plan:        if self.verbosity >= 1:            self.stdout.write("  No migrations to apply.")            # If there's changes that aren't in migrations yet, tell them how to fix it.            autodetector = MigrationAutodetector(                executor.loader.project_state(),                ProjectState.from_apps(apps),            )            changes = autodetector.changes(graph=executor.loader.graph)            if changes:                self.stdout.write(self.style.NOTICE(                    "  Your models have changes that are not yet reflected "                    "in a migration, and so won't be applied."                ))                self.stdout.write(self.style.NOTICE(                    "  Run 'manage.py makemigrations' to make new "                    "migrations, and then re-run 'manage.py migrate' to "                    "apply them."                ))        fake = False        fake_initial = False    else:        fake = options['fake']        fake_initial = options['fake_initial']    post_migrate_state = executor.migrate(        targets, plan=plan, state=pre_migrate_state.clone(), fake=fake,        fake_initial=fake_initial,    )                                                               # 执行相关migrate的操作    post_migrate_apps = post_migrate_state.apps                         # Re-render models of real apps to include relationships now that    # we've got a final state. This wouldn't be necessary if real apps    # models were rendered with relationships in the first place.    with post_migrate_apps.bulk_update():        model_keys = []        for model_state in post_migrate_apps.real_models:            model_key = model_state.app_label, model_state.name_lower            model_keys.append(model_key)            post_migrate_apps.unregister_model(*model_key)    post_migrate_apps.render_multiple([        ModelState.from_model(apps.get_model(*model)) for model in model_keys    ])    # Send the post_migrate signal, so individual apps can do whatever they need    # to do at this point.    emit_post_migrate_signal(        self.verbosity, self.interactive, connection.alias, apps=post_migrate_apps, plan=plan,    )

在上一节中,已经分析过了check_migrations,与handler中的大致处理流程过程类似,在经历过数据库连接实例的初始化之后,会调用;

executor.loader.check_consistent_history(connection)

检查所有依赖是否已经执行完成;

def check_consistent_history(self, connection):    """    Raise InconsistentMigrationHistory if any applied migrations have    unapplied dependencies.    """    recorder = MigrationRecorder(connection)    applied = recorder.applied_migrations()                             # 获取django_migrations中已经执行的migrations    for migration in applied:                                           # 依次遍历已经执行的migrations        # If the migration is unknown, skip it.        if migration not in self.graph.nodes:                           # 如果不在graph.nodes中则跳过            continue        for parent in self.graph.node_map[migration].parents:           # 获取migration的parents            if parent not in applied:                                   # 如果parent不在已经执行的列表中                # Skip unapplied squashed migrations that have all of their                # `replaces` applied.                if parent in self.replacements:                         # 如果parent在replace列表中                    if all(m in applied for m in self.replacements[parent].replaces):   # 如果替代的replace的replaces都在已经执行过的列表中则继续下一个循环                        continue                                                                                raise InconsistentMigrationHistory(                    "Migration {}.{} is applied before its dependency "                    "{}.{} on database '{}'.".format(                        migration[0], migration[1], parent[0], parent[1],                        connection.alias,                    )                )                                                       # 报错

在检查完依赖后,继续执行;

conflicts = executor.loader.detect_conflicts()

通过该函数,找出具体的app和migration_name冲突的信息;

def detect_conflicts(self):    """    Looks through the loaded graph and detects any conflicts - apps    with more than one leaf migration. Returns a dict of the app labels    that conflict with the migration names that conflict.    """    seen_apps = {}    conflicting_apps = set()    for app_label, migration_name in self.graph.leaf_nodes():               # 获取所有的子节点        if app_label in seen_apps:                                          # 如果已经在seen_apps中            conflicting_apps.add(app_label)                                 # 添加到conflicting_apps集合中        seen_apps.setdefault(app_label, set()).add(migration_name)          # 设置到seen_apps中    return {app_label: seen_apps[app_label] for app_label in conflicting_apps}  # 遍历conflicting_apps并按照app: migration_name设置值

然后,在冲突检查完成后,根据传入的参数值,如app migration_name,app和不传值的方式,获取targets,然后获取plan,再根据配置打印相关信息后,在生成projectstate后调用;

post_migrate_state = executor.migrate(        targets, plan=plan, state=pre_migrate_state.clone(), fake=fake,        fake_initial=fake_initial,    )

执行相关数据库的执行操作,

def migrate(self, targets, plan=None, state=None, fake=False, fake_initial=False):    """    Migrates the database up to the given targets.    Django first needs to create all project states before a migration is    (un)applied and in a second step run all the database operations.    """    if plan is None:                                                                    # 如果传入的plan为none        plan = self.migration_plan(targets)                                             # 根据targets生成plan    # Create the forwards plan Django would follow on an empty database    full_plan = self.migration_plan(self.loader.graph.leaf_nodes(), clean_start=True)   # 根据所有的子节点生成plan    all_forwards = all(not backwards for mig, backwards in plan)                        # 是否所有的backwards都是False    all_backwards = all(backwards for mig, backwards in plan)                           # 是否所有的backwards为True    if not plan:                                                                        # 如果生成的为None        if state is None:                                                               # state传入为None            # The resulting state should include applied migrations.            state = self._create_project_state(with_applied_migrations=True)            # 生成一个state    elif all_forwards == all_backwards:                                                 # 如果两个相同        # This should only happen if there's a mixed plan        raise InvalidMigrationPlan(            "Migration plans with both forwards and backwards migrations "            "are not supported. Please split your migration process into "            "separate plans of only forwards OR backwards migrations.",            plan        )    elif all_forwards:                                                                  # 如果都是True        if state is None:            # The resulting state should still include applied migrations.            state = self._create_project_state(with_applied_migrations=True)        state = self._migrate_all_forwards(state, plan, full_plan, fake=fake, fake_initial=fake_initial)    else:        # No need to check for `elif all_backwards` here, as that condition        # would always evaluate to true.        state = self._migrate_all_backwards(plan, full_plan, fake=fake)                 # 调用_migrate_all_backwards执行    self.check_replacements()                                                           # 检查replacements的值    return state

当前仅分析创建数据库的backwards的操作,即调用_migrate_all_forwards的操作;

def _migrate_all_forwards(self, state, plan, full_plan, fake, fake_initial):    """    Take a list of 2-tuples of the form (migration instance, False) and    apply them in the order they occur in the full_plan.    """    migrations_to_run = {m[0] for m in plan}                                        # 获取需要执行的migration    for migration, _ in full_plan:        if not migrations_to_run:            # We remove every migration that we applied from these sets so            # that we can bail out once the last migration has been applied            # and don't always run until the very end of the migration            # process.            break        if migration in migrations_to_run:            if 'apps' not in state.__dict__:                if self.progress_callback:                    self.progress_callback("render_start")                state.apps  # Render all -- performance critical                if self.progress_callback:                    self.progress_callback("render_success")            state = self.apply_migration(state, migration, fake=fake, fake_initial=fake_initial)    # 执行migration            migrations_to_run.remove(migration)                                                     # 从待执行列表中移除    return state

此时就会调用apply_migration对数据库进行相关操作,方法如下;

def apply_migration(self, state, migration, fake=False, fake_initial=False):    """    Runs a migration forwards.    """     if self.progress_callback:                                                      # 检查是否有回调函数        self.progress_callback("apply_start", migration, fake)                      # 有则执行回调函数    if not fake:        if fake_initial:            # Test to see if this is an already-applied initial migration            applied, state = self.detect_soft_applied(state, migration)            if applied:                fake = True        if not fake:            # Alright, do it normally            with self.connection.schema_editor(atomic=migration.atomic) as schema_editor:                state = migration.apply(state, schema_editor)                       # 调用migration.apply执行    # For replacement migrations, record individual statuses    if migration.replaces:                                                          # 是否有replaces        for app_label, name in migration.replaces:             self.recorder.record_applied(app_label, name)                           # 记录已经执行的record中    else:        self.recorder.record_applied(migration.app_label, migration.name)               # Report progress    if self.progress_callback:        self.progress_callback("apply_success", migration, fake)    return state

假如有如下的migrations文件;

class Migration(migrations.Migration):    initial = True    dependencies = [    ]    operations = [        migrations.CreateModel(            name='Authorization',            fields=[                ('id', models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),                ('create_time', models.DateTimeField(auto_now_add=True)),                ('owner_state', models.IntegerField(choices=[(2, '同意'), (3, '拒绝')], default=1)),                ('reciever_state', models.IntegerField(choices=[(2, '同意'), (3, '拒绝')], default=2)),            ],        ),    ]

此时的migration.apply就是调用了的如下方法;

def apply(self, project_state, schema_editor, collect_sql=False):    """    Takes a project_state representing all migrations prior to this one    and a schema_editor for a live database and applies the migration    in a forwards order.    Returns the resulting project state for efficient re-use by following    Migrations.    """    for operation in self.operations:                                           # 遍历所有的操作        # If this operation cannot be represented as SQL, place a comment        # there instead        if collect_sql:             schema_editor.collected_sql.append("--")            if not operation.reduces_to_sql:                schema_editor.collected_sql.append(                    "-- MIGRATION NOW PERFORMS OPERATION THAT CANNOT BE WRITTEN AS SQL:"                )            schema_editor.collected_sql.append("-- %s" % operation.describe())            schema_editor.collected_sql.append("--")            if not operation.reduces_to_sql:                continue        # Save the state before the operation has run        old_state = project_state.clone()                                       # 获取当前state        operation.state_forwards(self.app_label, project_state)                 # 调用operation的state_forwards        # Run the operation        atomic_operation = operation.atomic or (self.atomic and operation.atomic is not False)        if not schema_editor.atomic_migration and atomic_operation:            # Force a transaction on a non-transactional-DDL backend or an            # atomic operation inside a non-atomic migration.            with atomic(schema_editor.connection.alias):                operation.database_forwards(self.app_label, schema_editor, old_state, project_state)    # 调用operation.database_forwards方法执行        else:            # Normal behaviour            operation.database_forwards(self.app_label, schema_editor, old_state, project_state)    return project_state                                                        # 返回状态

此时就是调用了database_forwards来创建数据库,此时就是调用migrations.CreateModel的database_forwards方法来创建数据库,

def database_forwards(self, app_label, schema_editor, from_state, to_state):    model = to_state.apps.get_model(app_label, self.name)    if self.allow_migrate_model(schema_editor.connection.alias, model):        schema_editor.create_model(model)

调用了MySQLGISSchemaEditor的create_model方法,此时就是真正的执行create_model的方法,通过基类BaseDatabaseSchemaEditor的create_model方法生成model,组合model的信息创建相应的键值;

def create_model(self, model):    """    Takes a model and creates a table for it in the database.    Will also create any accompanying indexes or unique constraints.    """    # Create column SQL, add FK deferreds if needed    column_sqls = []    params = []    for field in model._meta.local_fields:        # SQL        definition, extra_params = self.column_sql(model, field)        if definition is None:            continue        # Check constraints can go on the column SQL here        db_params = field.db_parameters(connection=self.connection)        if db_params['check']:            definition += " CHECK (%s)" % db_params['check']        # Autoincrement SQL (for backends with inline variant)        col_type_suffix = field.db_type_suffix(connection=self.connection)        if col_type_suffix:            definition += " %s" % col_type_suffix        params.extend(extra_params)        # FK        if field.remote_field and field.db_constraint:            to_table = field.remote_field.model._meta.db_table            to_column = field.remote_field.model._meta.get_field(field.remote_field.field_name).column            if self.connection.features.supports_foreign_keys:                self.deferred_sql.append(self._create_fk_sql(model, field, "_fk_%(to_table)s_%(to_column)s"))            elif self.sql_create_inline_fk:                definition += " " + self.sql_create_inline_fk % {                    "to_table": self.quote_name(to_table),                    "to_column": self.quote_name(to_column),                }        # Add the SQL to our big list        column_sqls.append("%s %s" % (            self.quote_name(field.column),            definition,        ))        # Autoincrement SQL (for backends with post table definition variant)        if field.get_internal_type() in ("AutoField", "BigAutoField"):            autoinc_sql = self.connection.ops.autoinc_sql(model._meta.db_table, field.column)            if autoinc_sql:                self.deferred_sql.extend(autoinc_sql)    # Add any unique_togethers (always deferred, as some fields might be    # created afterwards, like geometry fields with some backends)    for fields in model._meta.unique_together:        columns = [model._meta.get_field(field).column for field in fields]        self.deferred_sql.append(self._create_unique_sql(model, columns))    # Make the table    sql = self.sql_create_table % {        "table": self.quote_name(model._meta.db_table),        "definition": ", ".join(column_sqls)    }    if model._meta.db_tablespace:        tablespace_sql = self.connection.ops.tablespace_sql(model._meta.db_tablespace)        if tablespace_sql:            sql += ' ' + tablespace_sql    # Prevent using [] as params, in the case a literal '%' is used in the definition    self.execute(sql, params or None)    # Add any field index and index_together's (deferred as SQLite3 _remake_table needs it)    self.deferred_sql.extend(self._model_indexes_sql(model))    # Make M2M tables    for field in model._meta.local_many_to_many:        if field.remote_field.through._meta.auto_created:            self.create_model(field.remote_field.through)

这就是最终执行的结果,大家有兴趣可自行进行更为细致的学习,至此migrate的大致的生成流程就分析完成。

总结

Django的migrate的命令相对繁琐,其中由于django兼顾不同的数据库,所有在数据库中进行了大量的封装,对不同数据库的不同特性需要进行兼容,django的orm的相关部分内容相对比较繁琐和复杂,本文也只是很浅显的分析了有关其中最简单的migrate的生成情况的分析,其中难免忽略了其他的大量细节内容,大家有兴趣课自行阅读,如本文中有所疏漏,请批评指正。

转载地址:https://blog.csdn.net/qq_33339479/article/details/82056499 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Django源码分析8:单元测试test命令浅析
下一篇:Python3.5源码分析-Dict概述

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年03月17日 08时10分58秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

jquery查找div下第一个input_jquery查找div元素第一个元素id 2019-04-21
如何修改手机屏幕显示的长宽比例_屏幕分辨率 尺寸 比例 长宽 如何计算 2019-04-21
mysql 的版本 命名规则_MySQL版本和命名规则 2019-04-21
no java stack_Java Stack contains()用法及代码示例 2019-04-21
java动态代码_Java Agent入门学习之动态修改代码 2019-04-21
python集合如何去除重复数据_Python 迭代删除重复项,集合删除重复项 2019-04-21
iview 自定义时间选择器组件_Vue.js中使用iView日期选择器并设置开始时间结束时间校验功能... 2019-04-21
java 验证码校验_JavaWeb验证码校验功能代码实例 2019-04-21
java多线程初学者指南_Java多线程初学者指南(4):线程的生命周期 2019-04-21
java进程user是jenkins_java 学习:在java中启动其他应用,由jenkins想到的 2019-04-21
java添加资源文件_如何在eclipse中将资源文件夹添加到我的Java项目中 2019-04-21
java的三种修饰符_3分钟弄明白JAVA三大修饰符 2019-04-21
mysql source skip_redis mysql 中的跳表(skip list) 查找树(btree) 2019-04-21
java sun.org.mozilla_maven编译找不到符号 sun.org.mozilla.javascript.internal 2019-04-21
php curl 输出到文件,PHP 利用CURL(HTTP)实现服务器上传文件至另一服务器 2019-04-21
PHP字符串运算结果,PHP运算符(二)"字符串运算符"实例详解 2019-04-21
PHP实现 bcrypt,如何使php中的bcrypt和Java中的jbcrypt兼容 2019-04-21
php8安全,PHP八大安全函数解析 2019-04-21
php基础语法了解和熟悉的表现,PHP第二课 了解PHP的基本语法以及目录结构 2019-04-21
matlab中lag函数用法,MATLAB movavg函数用法 2019-04-21