pip包管理工具-install执行流程简单查看
发布日期:2021-07-25 13:05:00 浏览次数:15 分类:技术文章

本文共 40226 字,大约阅读时间需要 134 分钟。

pip概述

pip是python提供的包管理工具,该工具提供了对python包的查找、下载、安装与卸载等功能的工具,当前是python中比较主流的管理工具。

pip下载安装包的概述

pip工具的本质通过网络请求去请求服务端对应名称的文件然后解压到本地的python的库文件夹中,从而达到将远端的python包下载并安装到本地。概述流程如下:

pip install django1.先获取到远端的服务器地址url比如:http://mirrors.aliyun.com/pypi/simple/2.然后获取到本地的库安装的路径,通过服务器url去查找对应的django包3.讲找到的包下载到本地4.解压该包到python的site-packages文件夹中,然后检查是否需要依赖其他包,如果依赖就安装其他的包5.如果有依赖安装则按照同样的流程执行,待完成之后包就安装完成

pip安装的初始化流程

因为pip安装的本质就是通过url去服务端请求文件并下载,pip网络请求使用的requests库,只不过被封装到了pip的_vendor文件夹里面,并没有将requests包进行安装。

本次执行示例,假设python环境中还未安装Django,此时我们执行pip install django,函数的入口如下:

pip = pip._internal:main

查看main函数;

def main(args=None):    if args is None:        args = sys.argv[1:]                 # 获取命令行输入参数    # Configure our deprecation warnings to be sent through loggers    deprecation.install_warning_logger()     autocomplete()    try:        cmd_name, cmd_args = parse_command(args)   # 解析传入的命令行参数    except PipError as exc:        sys.stderr.write("ERROR: %s" % exc)        sys.stderr.write(os.linesep)        sys.exit(1)    # Needed for locale.getpreferredencoding(False) to work    # in pip._internal.utils.encoding.auto_decode    try:        locale.setlocale(locale.LC_ALL, '')                 except locale.Error as e:        # setlocale can apparently crash if locale are uninitialized        logger.debug("Ignoring error %s when setting locale", e)    command = commands_dict[cmd_name](isolated=("--isolated" in cmd_args))   # 根据传入的命令找到对应的command  本例中传入的是install    return command.main(cmd_args)                                            # 执行对应cmd的main 方法

此时查看commands_dict中对应的install命令,该命令是InstallCommand,通过继承关系可知,InstallCommand继承自 RequirementCommand,而RequirementCommand继承自Command类,从command.main的执行的是Command类的main方法,

def main(self, args):        # type: (List[str]) -> int        options, args = self.parse_args(args)。              # 解析传入的参数        # Set verbosity so that it can be used elsewhere.        self.verbosity = options.verbose - options.quiet              level_number = setup_logging(            verbosity=self.verbosity,            no_color=options.no_color,            user_log_file=options.log,        )                                                   # 设置打印日志的等级        if sys.version_info[:2] == (3, 4):                  # 获取当前的版本是否是3.4            deprecated(                "Python 3.4 support has been deprecated. pip 19.1 will be the "                "last one supporting it. Please upgrade your Python as Python "                "3.4 won't be maintained after March 2019 (cf PEP 429).",                replacement=None,                gone_in='19.2',            )        elif sys.version_info[:2] == (2, 7):                # 是否是2.7            message = (                "A future version of pip will drop support for Python 2.7."            )            if platform.python_implementation() == "CPython":                message = (                    "Python 2.7 will reach the end of its life on January "                    "1st, 2020. Please upgrade your Python as Python 2.7 "                    "won't be maintained after that date. "                ) + message            deprecated(message, replacement=None, gone_in=None)        # TODO: Try to get these passing down from the command?        #       without resorting to os.environ to hold these.        #       This also affects isolated builds and it should.        if options.no_input:            os.environ['PIP_NO_INPUT'] = '1'        if options.exists_action:            os.environ['PIP_EXISTS_ACTION'] = ' '.join(options.exists_action)        if options.require_venv and not self.ignore_require_venv:            # If a venv is required check if it can really be found            if not running_under_virtualenv():                logger.critical(                    'Could not find an activated virtualenv (required).'                )                sys.exit(VIRTUALENV_NOT_FOUND)        try:            status = self.run(options, args)                                    # 调用子类实现的run方法来执行业务            # FIXME: all commands should return an exit status            # and when it is done, isinstance is not needed anymore            if isinstance(status, int):                return status        except PreviousBuildDirError as exc:                                    # 报错处理流程            logger.critical(str(exc))            logger.debug('Exception information:', exc_info=True)            return PREVIOUS_BUILD_DIR_ERROR        except (InstallationError, UninstallationError, BadCommand) as exc:            logger.critical(str(exc))            logger.debug('Exception information:', exc_info=True)            return ERROR        except CommandError as exc:            logger.critical('ERROR: %s', exc)            logger.debug('Exception information:', exc_info=True)            return ERROR        except BrokenStdoutLoggingError:            # Bypass our logger and write any remaining messages to stderr            # because stdout no longer works.            print('ERROR: Pipe to stdout was broken', file=sys.stderr)            if level_number <= logging.DEBUG:                traceback.print_exc(file=sys.stderr)            return ERROR        except KeyboardInterrupt:            logger.critical('Operation cancelled by user')            logger.debug('Exception information:', exc_info=True)            return ERROR        except BaseException:            logger.critical('Exception:', exc_info=True)            return UNKNOWN_ERROR        finally:            allow_version_check = (                # Does this command have the index_group options?                hasattr(options, "no_index") and                # Is this command allowed to perform this check?                not (options.disable_pip_version_check or options.no_index)            )            # Check if we're using the latest version of pip available            if allow_version_check:                session = self._build_session(                    options,                    retries=0,                    timeout=min(5, options.timeout)                )                with session:                    pip_version_check(session, options)            # Shutdown the logging module            logging.shutdown()        return SUCCESS

此时我们查看InstallCommand类的run方法,

def run(self, options, args):        cmdoptions.check_install_build_global(options)                      # 检查输入的build-option等信息        upgrade_strategy = "to-satisfy-only"         if options.upgrade:            upgrade_strategy = options.upgrade_strategy        if options.build_dir:            options.build_dir = os.path.abspath(options.build_dir)          # _build_package_finder        cmdoptions.check_dist_restriction(options, check_target=True)        if options.python_version:            python_versions = [options.python_version]                      # python版本        else:            python_versions = None        options.src_dir = os.path.abspath(options.src_dir)                  # 获取src_dir文件路径        install_options = options.install_options or []                     # 注册选项        if options.use_user_site:            if options.prefix_path:                raise CommandError(                    "Can not combine '--user' and '--prefix' as they imply "                    "different installation locations"                )            if virtualenv_no_global():                raise InstallationError(                    "Can not perform a '--user' install. User site-packages "                    "are not visible in this virtualenv."                )            install_options.append('--user')            install_options.append('--prefix=')        target_temp_dir = TempDirectory(kind="target")        if options.target_dir:            options.ignore_installed = True            options.target_dir = os.path.abspath(options.target_dir)            if (os.path.exists(options.target_dir) and not                    os.path.isdir(options.target_dir)):                raise CommandError(                    "Target path exists but is not a directory, will not "                    "continue."                )            # Create a target directory for using with the target option            target_temp_dir.create()            install_options.append('--home=' + target_temp_dir.path)        global_options = options.global_options or []        with self._build_session(options) as session:                               # 新建session,该session就是封装了requests的Session               finder = self._build_package_finder(                options=options,                session=session,                platform=options.platform,                python_versions=python_versions,                abi=options.abi,                implementation=options.implementation,            )                                                                       # 生成一个PackageFinder类实例,该实例可下载安装包            build_delete = (not (options.no_clean or options.build_dir))            wheel_cache = WheelCache(options.cache_dir, options.format_control)            if options.cache_dir and not check_path_owner(options.cache_dir):                logger.warning(                    "The directory '%s' or its parent directory is not owned "                    "by the current user and caching wheels has been "                    "disabled. check the permissions and owner of that "                    "directory. If executing pip with sudo, you may want "                    "sudo's -H flag.",                    options.cache_dir,                )                options.cache_dir = None            with RequirementTracker() as req_tracker, TempDirectory(                options.build_dir, delete=build_delete, kind="install"            ) as directory:                requirement_set = RequirementSet(                    require_hashes=options.require_hashes,                    check_supported_wheels=not options.target_dir,                )                                                                   # 生成一个RequirementSet实例                try:                    self.populate_requirement_set(                        requirement_set, args, options, finder, session,                        self.name, wheel_cache                    )                                                               # 将输入的参数转为RequirementSet实例                    preparer = RequirementPreparer(                        build_dir=directory.path,                        src_dir=options.src_dir,                        download_dir=None,                        wheel_download_dir=None,                        progress_bar=options.progress_bar,                        build_isolation=options.build_isolation,                        req_tracker=req_tracker,                    )                                                               # 生成一个preparer实例                    resolver = Resolver(                        preparer=preparer,                        finder=finder,                        session=session,                        wheel_cache=wheel_cache,                        use_user_site=options.use_user_site,                        upgrade_strategy=upgrade_strategy,                        force_reinstall=options.force_reinstall,                        ignore_dependencies=options.ignore_dependencies,                        ignore_requires_python=options.ignore_requires_python,                        ignore_installed=options.ignore_installed,                        isolated=options.isolated_mode,                        use_pep517=options.use_pep517                    )                                                               # 解析实例                    resolver.resolve(requirement_set)                               # 解析并下载对应的包                    protect_pip_from_modification_on_windows(                        modifying_pip=requirement_set.has_requirement("pip")                    )                    # Consider legacy and PEP517-using requirements separately                    legacy_requirements = []                    pep517_requirements = []                    for req in requirement_set.requirements.values():                        if req.use_pep517:                            pep517_requirements.append(req)                        else:                            legacy_requirements.append(req)                    # We don't build wheels for legacy requirements if we                    # don't have wheel installed or we don't have a cache dir                    try:                        import wheel  # noqa: F401                        build_legacy = bool(options.cache_dir)                    except ImportError:                        build_legacy = False                    wb = WheelBuilder(                        finder, preparer, wheel_cache,                        build_options=[], global_options=[],                    )                    # Always build PEP 517 requirements                    build_failures = wb.build(                        pep517_requirements,                        session=session, autobuilding=True                    )                    if build_legacy:                        # We don't care about failures building legacy                        # requirements, as we'll fall through to a direct                        # install for those.                        wb.build(                            legacy_requirements,                            session=session, autobuilding=True                        )                    # If we're using PEP 517, we cannot do a direct install                    # so we fail here.                    if build_failures:                        raise InstallationError(                            "Could not build wheels for {} which use"                            " PEP 517 and cannot be installed directly".format(                                ", ".join(r.name for r in build_failures)))                    to_install = resolver.get_installation_order(                        requirement_set                    )                                                               # 添加需要安装的install                    # Consistency Checking of the package set we're installing.                    should_warn_about_conflicts = (                        not options.ignore_dependencies and                        options.warn_about_conflicts                    )                    if should_warn_about_conflicts:                        self._warn_about_conflicts(to_install)                    # Don't warn about script install locations if                    # --target has been specified                    warn_script_location = options.warn_script_location                    if options.target_dir:                        warn_script_location = False                    installed = install_given_reqs(                        to_install,                        install_options,                        global_options,                        root=options.root_path,                        home=target_temp_dir.path,                        prefix=options.prefix_path,                        pycompile=options.compile,                        warn_script_location=warn_script_location,                        use_user_site=options.use_user_site,                    )                                                           # 安装准备好的包                    lib_locations = get_lib_location_guesses(                        user=options.use_user_site,                        home=target_temp_dir.path,                        root=options.root_path,                        prefix=options.prefix_path,                        isolated=options.isolated_mode,                    )                    working_set = pkg_resources.WorkingSet(lib_locations)                    reqs = sorted(installed, key=operator.attrgetter('name'))                    items = []                    for req in reqs:                        item = req.name                        try:                            installed_version = get_installed_version(                                req.name, working_set=working_set                            )                            if installed_version:                                item += '-' + installed_version                        except Exception:                            pass                        items.append(item)                    installed = ' '.join(items)                    if installed:                        logger.info('Successfully installed %s', installed)     # 打印安装好的模块                except EnvironmentError as error:                    show_traceback = (self.verbosity >= 1)                    message = create_env_error_message(                        error, show_traceback, options.use_user_site,                    )                    logger.error(message, exc_info=show_traceback)                    return ERROR                except PreviousBuildDirError:                    options.no_clean = True                    raise                finally:                    # Clean up                    if not options.no_clean:                        requirement_set.cleanup_files()                        wheel_cache.cleanup()        if options.target_dir:            self._handle_target_dir(                options.target_dir, target_temp_dir, options.upgrade            )        return requirement_set

其中有关下载的主要的函数就是resolver.resolve(requirement_set),该函数的执行代码如下;

def resolve(self, requirement_set):        # type: (RequirementSet) -> None        """Resolve what operations need to be done        As a side-effect of this method, the packages (and their dependencies)        are downloaded, unpacked and prepared for installation. This        preparation is done by ``pip.operations.prepare``.        Once PyPI has static dependency metadata available, it would be        possible to move the preparation to become a step separated from        dependency resolution.        """        # make the wheelhouse        if self.preparer.wheel_download_dir:            ensure_dir(self.preparer.wheel_download_dir)        # If any top-level requirement has a hash specified, enter        # hash-checking mode, which requires hashes from all.        root_reqs = (            requirement_set.unnamed_requirements +            list(requirement_set.requirements.values())        )        self.require_hashes = (            requirement_set.require_hashes or            any(req.has_hash_options for req in root_reqs)        )        # Display where finder is looking for packages        locations = self.finder.get_formatted_locations()                       # 获取下载的服务器URL        if locations:            logger.info(locations)        # Actually prepare the files, and collect any exceptions. Most hash        # exceptions cannot be checked ahead of time, because        # req.populate_link() needs to be called before we can make decisions        # based on link type.        discovered_reqs = []  # type: List[InstallRequirement]        hash_errors = HashErrors()        for req in chain(root_reqs, discovered_reqs):            try:                discovered_reqs.extend(                    self._resolve_one(requirement_set, req)                 # 调用_resolve_one函数去发送请求去下载模块                )            except HashError as exc:                exc.req = req                hash_errors.append(exc)        if hash_errors:            raise hash_errors

此时继续查看_resolve_one函数;

def _resolve_one(        self,        requirement_set,  # type: RequirementSet        req_to_install  # type: InstallRequirement    ):        # type: (...) -> List[InstallRequirement]        """Prepare a single requirements file.        :return: A list of additional InstallRequirements to also install.        """        # Tell user what we are doing for this requirement:        # obtain (editable), skipping, processing (local url), collecting        # (remote url or package name)        if req_to_install.constraint or req_to_install.prepared:            return []        req_to_install.prepared = True        # register tmp src for cleanup in case something goes wrong        requirement_set.reqs_to_cleanup.append(req_to_install)        abstract_dist = self._get_abstract_dist_for(req_to_install)             # 下载对应的模块        # Parse and return dependencies        dist = abstract_dist.dist()                                             # 获取依赖        try:            check_dist_requires_python(dist)        except UnsupportedPythonVersion as err:            if self.ignore_requires_python:                logger.warning(err.args[0])            else:                raise        more_reqs = []  # type: List[InstallRequirement]        def add_req(subreq, extras_requested):            sub_install_req = install_req_from_req_string(                str(subreq),                req_to_install,                isolated=self.isolated,                wheel_cache=self.wheel_cache,                use_pep517=self.use_pep517            )            parent_req_name = req_to_install.name            to_scan_again, add_to_parent = requirement_set.add_requirement(                sub_install_req,                parent_req_name=parent_req_name,                extras_requested=extras_requested,            )            if parent_req_name and add_to_parent:                self._discovered_dependencies[parent_req_name].append(                    add_to_parent                )            more_reqs.extend(to_scan_again)        with indent_log():                                                      # 下载依赖            # We add req_to_install before its dependencies, so that we            # can refer to it when adding dependencies.            if not requirement_set.has_requirement(req_to_install.name):                # 'unnamed' requirements will get added here                req_to_install.is_direct = True                requirement_set.add_requirement(                    req_to_install, parent_req_name=None,                )            if not self.ignore_dependencies:                if req_to_install.extras:                    logger.debug(                        "Installing extra requirements: %r",                        ','.join(req_to_install.extras),                    )                missing_requested = sorted(                    set(req_to_install.extras) - set(dist.extras)                )                for missing in missing_requested:                    logger.warning(                        '%s does not provide the extra \'%s\'',                        dist, missing                    )                available_requested = sorted(                    set(dist.extras) & set(req_to_install.extras)                )                for subreq in dist.requires(available_requested):                    add_req(subreq, extras_requested=available_requested)            if not req_to_install.editable and not req_to_install.satisfied_by:                # XXX: --no-install leads this to report 'Successfully                # downloaded' for only non-editable reqs, even though we took                # action on them.                requirement_set.successfully_downloaded.append(req_to_install)        return more_reqs

此时继续查看_get_abstract_dist_for函数是如何下载的;

def _get_abstract_dist_for(self, req):        # type: (InstallRequirement) -> DistAbstraction        """Takes a InstallRequirement and returns a single AbstractDist \        representing a prepared variant of the same.        """        ...        # satisfied_by is only evaluated by calling _check_skip_installed,        # so it must be None here.        assert req.satisfied_by is None        skip_reason = self._check_skip_installed(req)                       # 检查是否有已经注册过的        if req.satisfied_by:            return self.preparer.prepare_installed_requirement(                req, self.require_hashes, skip_reason            )        upgrade_allowed = self._is_upgrade_allowed(req)        abstract_dist = self.preparer.prepare_linked_requirement(            req, self.session, self.finder, upgrade_allowed,            self.require_hashes        )                                                                   # 下载包        ...        return abstract_dist

prepare_linked_requirement函数对应的代码如下;

def prepare_linked_requirement(        self,        req,  # type: InstallRequirement        session,  # type: PipSession        finder,  # type: PackageFinder        upgrade_allowed,  # type: bool        require_hashes  # type: bool    ):        # type: (...) -> DistAbstraction        """Prepare a requirement that would be obtained from req.link        """        # TODO: Breakup into smaller functions        if req.link and req.link.scheme == 'file':            path = url_to_path(req.link.url)            logger.info('Processing %s', display_path(path))        else:            logger.info('Collecting %s', req)        with indent_log():            # @@ if filesystem packages are not marked            # editable in a req, a non deterministic error            # occurs when the script attempts to unpack the            # build directory            req.ensure_has_source_dir(self.build_dir)            # If a checkout exists, it's unwise to keep going.  version            # inconsistencies are logged later, but do not fail the            # installation.            # FIXME: this won't upgrade when there's an existing            # package unpacked in `req.source_dir`            # package unpacked in `req.source_dir`            if os.path.exists(os.path.join(req.source_dir, 'setup.py')):                raise PreviousBuildDirError(                    "pip can't proceed with requirements '%s' due to a"                    " pre-existing build directory (%s). This is "                    "likely due to a previous installation that failed"                    ". pip is being responsible and not assuming it "                    "can delete this. Please delete it and try again."                    % (req, req.source_dir)                )            req.populate_link(finder, upgrade_allowed, require_hashes)              # 下载内容            # We can't hit this spot and have populate_link return None.            # req.satisfied_by is None here (because we're            # guarded) and upgrade has no impact except when satisfied_by            # is not None.            # Then inside find_requirement existing_applicable -> False            # If no new versions are found, DistributionNotFound is raised,            # otherwise a result is guaranteed.            assert req.link            link = req.link            # Now that we have the real link, we can tell what kind of            # requirements we have and raise some more informative errors            # than otherwise. (For example, we can raise VcsHashUnsupported            # for a VCS URL rather than HashMissing.)            if require_hashes:                # We could check these first 2 conditions inside                # unpack_url and save repetition of conditions, but then                # we would report less-useful error messages for                # unhashable requirements, complaining that there's no                # hash provided.                if is_vcs_url(link):                    raise VcsHashUnsupported()                elif is_file_url(link) and is_dir_url(link):                    raise DirectoryUrlHashUnsupported()                if not req.original_link and not req.is_pinned:                    # Unpinned packages are asking for trouble when a new                    # version is uploaded. This isn't a security check, but                    # it saves users a surprising hash mismatch in the                    # future.                    #                    # file:/// URLs aren't pinnable, so don't complain                    # about them not being pinned.                    raise HashUnpinned()            hashes = req.hashes(trust_internet=not require_hashes)            if require_hashes and not hashes:                # Known-good hashes are missing for this requirement, so                # shim it with a facade object that will provoke hash                # computation and then raise a HashMissing exception                # showing the user what the hash should be.                hashes = MissingHashes()            try:                download_dir = self.download_dir                                           # 需要下载到的文件目录                # We always delete unpacked sdists after pip ran.                autodelete_unpacked = True                if req.link.is_wheel and self.wheel_download_dir:                    # when doing 'pip wheel` we download wheels to a                    # dedicated dir.                    download_dir = self.wheel_download_dir                if req.link.is_wheel:                    if download_dir:                        # When downloading, we only unpack wheels to get                        # metadata.                        autodelete_unpacked = True                    else:                        # When installing a wheel, we use the unpacked                        # wheel.                        autodelete_unpacked = False                unpack_url(                    req.link, req.source_dir,                    download_dir, autodelete_unpacked,                    session=session, hashes=hashes,                    progress_bar=self.progress_bar                )                                                                          # 解压相关文件            except requests.HTTPError as exc:                logger.critical(                    'Could not install requirement %s because of error %s',                    req,                    exc,                )                raise InstallationError(                    'Could not install requirement %s because of HTTP '                    'error %s for URL %s' %                    (req, exc, req.link)                )            abstract_dist = make_abstract_dist(req)            with self.req_tracker.track(req):                abstract_dist.prep_for_dist(finder, self.build_isolation)            if self._download_should_save:                                                  # 将下载的内容保存                # Make a .zip of the source_dir we already created.                if req.link.scheme in vcs.all_schemes:                    req.archive(self.download_dir)        return abstract_dist

此时通过req.populate_link来进行模块的下载;

def populate_link(self, finder, upgrade, require_hashes):        # type: (PackageFinder, bool, bool) -> None        """Ensure that if a link can be found for this, that it is found.        Note that self.link may still be None - if Upgrade is False and the        requirement is already installed.        If require_hashes is True, don't use the wheel cache, because cached        wheels, always built locally, have different hashes than the files        downloaded from the index server and thus throw false hash mismatches.        Furthermore, cached wheels at present have undeterministic contents due        to file modification times.        """        if self.link is None:            self.link = finder.find_requirement(self, upgrade)              # 下载文件        if self._wheel_cache is not None and not require_hashes:            old_link = self.link            self.link = self._wheel_cache.get(self.link, self.name)            if old_link != self.link:                logger.debug('Using cached wheel link: %s', self.link)

此时对应的find_requirement函数的执行流程如下;

def find_requirement(self, req, upgrade):        # type: (InstallRequirement, bool) -> Optional[Link]        """Try to find a Link matching req        Expects req, an InstallRequirement and upgrade, a boolean        Returns a Link if found,        Raises DistributionNotFound or BestVersionAlreadyInstalled otherwise        """        all_candidates = self.find_all_candidates(req.name)             # 搜集并下载        # Filter out anything which doesn't match our specifier        compatible_versions = set(            req.specifier.filter(                # We turn the version object into a str here because otherwise                # when we're debundled but setuptools isn't, Python will see                # packaging.version.Version and                # pkg_resources._vendor.packaging.version.Version as different                # types. This way we'll use a str as a common data interchange                # format. If we stop using the pkg_resources provided specifier                # and start using our own, we can drop the cast to str().                [str(c.version) for c in all_candidates],                prereleases=(                    self.allow_all_prereleases                    if self.allow_all_prereleases else None                ),            )        )        ...        return best_candidate.location

find_all_candidates函数如下所示;

def find_all_candidates(self, project_name):        # type: (str) -> List[Optional[InstallationCandidate]]        """Find all available InstallationCandidate for project_name        This checks index_urls and find_links.        All versions found are returned as an InstallationCandidate list.        See _link_package_versions for details on which files are accepted        """        index_locations = self._get_index_urls_locations(project_name)        index_file_loc, index_url_loc = self._sort_locations(index_locations)        fl_file_loc, fl_url_loc = self._sort_locations(            self.find_links, expand_dir=True,        )        file_locations = (Link(url) for url in itertools.chain(            index_file_loc, fl_file_loc,        ))        # We trust every url that the user has given us whether it was given        #   via --index-url or --find-links.        # We want to filter out any thing which does not have a secure origin.        url_locations = [            link for link in itertools.chain(                (Link(url) for url in index_url_loc),                (Link(url) for url in fl_url_loc),            )            if self._validate_secure_origin(logger, link)        ]                                                                   # 查找地址        logger.debug('%d location(s) to search for versions of %s:',                     len(url_locations), project_name)        for location in url_locations:            logger.debug('* %s', location)        canonical_name = canonicalize_name(project_name)        formats = self.format_control.get_allowed_formats(canonical_name)        search = Search(project_name, canonical_name, formats)        find_links_versions = self._package_versions(            # We trust every directly linked archive in find_links            (Link(url, '-f') for url in self.find_links),            search        )        page_versions = []        for page in self._get_pages(url_locations, project_name):           # 访问远端服务器            logger.debug('Analyzing links from page %s', page.url)            with indent_log():                page_versions.extend(                    self._package_versions(page.iter_links(), search)                )        file_versions = self._package_versions(file_locations, search)          # 获取版本信息        if file_versions:            file_versions.sort(reverse=True)            logger.debug(                'Local files found: %s',                ', '.join([                    url_to_path(candidate.location.url)                    for candidate in file_versions                ])            )        # This is an intentional priority ordering        return file_versions + find_links_versions + page_versions

此时主要执行的就是_get_pages去获取内容;

def _get_pages(self, locations, project_name):    # type: (Iterable[Link], str) -> Iterable[HTMLPage]    """    Yields (page, page_url) from the given locations, skipping    locations that have errors.    """    seen = set()  # type: Set[Link]    for location in locations:        if location in seen:            continue        seen.add(location)        page = _get_html_page(location, session=self.session)           # 获取html信息        if page is None:            continue        yield pagedef _get_html_page(link, session=None):    # type: (Link, Optional[PipSession]) -> Optional[HTMLPage]    if session is None:        raise TypeError(            "_get_html_page() missing 1 required keyword argument: 'session'"        )    url = link.url.split('#', 1)[0]    # Check for VCS schemes that do not support lookup as web pages.    vcs_scheme = _match_vcs_scheme(url)    if vcs_scheme:        logger.debug('Cannot look at %s URL %s', vcs_scheme, link)        return None    # Tack index.html onto file:// URLs that point to directories    scheme, _, path, _, _, _ = urllib_parse.urlparse(url)                               # 编码    if (scheme == 'file' and os.path.isdir(urllib_request.url2pathname(path))):        # add trailing slash if not present so urljoin doesn't trim        # final segment        if not url.endswith('/'):            url += '/'        url = urllib_parse.urljoin(url, 'index.html')        logger.debug(' file: URL is directory, getting %s', url)    try:        resp = _get_html_response(url, session=session)                                 # 发送请求    except _NotHTTP as exc:        logger.debug(            'Skipping page %s because it looks like an archive, and cannot '            'be checked by HEAD.', link,        )    except _NotHTML as exc:        logger.debug(            'Skipping page %s because the %s request got Content-Type: %s',            link, exc.request_desc, exc.content_type,        )    except requests.HTTPError as exc:        _handle_get_page_fail(link, exc)    except RetryError as exc:        _handle_get_page_fail(link, exc)    except SSLError as exc:        reason = "There was a problem confirming the ssl certificate: "        reason += str(exc)        _handle_get_page_fail(link, reason, meth=logger.info)    except requests.ConnectionError as exc:        _handle_get_page_fail(link, "connection error: %s" % exc)    except requests.Timeout:        _handle_get_page_fail(link, "timed out")    else:        return HTMLPage(resp.content, resp.url, resp.headers)    return Nonedef _get_html_response(url, session):    # type: (str, PipSession) -> Response    """Access an HTML page with GET, and return the response.    This consists of three parts:    1. If the URL looks suspiciously like an archive, send a HEAD first to       check the Content-Type is HTML, to avoid downloading a large file.       Raise `_NotHTTP` if the content type cannot be determined, or       `_NotHTML` if it is not HTML.    2. Actually perform the request. Raise HTTP exceptions on network failures.    3. Check the Content-Type header to make sure we got HTML, and raise       `_NotHTML` otherwise.    """    if _is_url_like_archive(url):        _ensure_html_response(url, session=session)    logger.debug('Getting page %s', url)    resp = session.get(        url,        headers={            "Accept": "text/html",            # We don't want to blindly returned cached data for            # /simple/, because authors generally expecting that            # twine upload && pip install will function, but if            # they've done a pip install in the last ~10 minutes            # it won't. Thus by setting this to zero we will not            # blindly use any cached data, however the benefit of            # using max-age=0 instead of no-cache, is that we will            # still support conditional requests, so we will still            # minimize traffic sent in cases where the page hasn't            # changed at all, we will just always incur the round            # trip for the conditional GET now instead of only            # once per 10 minutes.            # For more information, please see pypa/pip#5670.            "Cache-Control": "max-age=0",        },    )                                                                       # 通过session发送请求去请求内容    resp.raise_for_status()    # The check for archives above only works if the url ends with    # something that looks like an archive. However that is not a    # requirement of an url. Unless we issue a HEAD request on every    # url we cannot know ahead of time for sure if something is HTML    # or not. However we can check after we've downloaded it.    _ensure_html_header(resp)    return resp

至此请求的过程就分析完成,该流程的主要调用过程如下所示;

在这里插入图片描述

总结

本文只是简单的查看了一下pip的请求流程而已,并没有对pip具体的业务场景进行详细分析,大家如有兴趣可自行分析,本文只是突然发现pip安装包的过程中源地址未换所以简单的看一下执行流程而已。大家有兴趣可自行分析。由于本人才疏学浅,如有错误请批评指正。

转载地址:https://blog.csdn.net/qq_33339479/article/details/93094774 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:redis源码分析(beta版本)-redis实现的概述逻辑
下一篇:python3.7源码分析-集合(set)

发表评论

最新留言

网站不错 人气很旺了 加油
[***.192.178.218]2024年03月16日 13时30分59秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

redis的使用 Java_java中使用redis 2019-04-21
java 数组元素位置_Java – 在数组中获取元素位置 2019-04-21
c 泛型与java泛型_C ++和Java中的“泛型”类型之间有什么区别? 2019-04-21
java 返回实体对象_java 封装返回结果实体类 返回结果以及错误信息 2019-04-21
java web 防止sql注入攻击_JavaWeb防注入知识点(一) 2019-04-21
java ssm 异常分类_SSM项目常见的异常与处理提示(一) 2019-04-21
java定义矩形类_Java定义矩形类 2019-04-21
java变量怎么变常量_Java的常量与变量是什么?怎么学习呀? 2019-04-21
java开发招聘试题_客户化开发招聘试题-Java开发.doc 2019-04-21
java jdk win10 1335_win10下安装java jdk,tomcat 2019-04-21
java list二分查找_java中的ArrayList和LinkedList的二分查找速度比 | 学步园 2019-04-21
php中的变量名称用什么表示,PHP变量,方法,类等名称中的有效字符是什么? 2019-04-21
pic32mx是什么cpu_PIC32MX单片机外设库使用(Ⅰ)- 系统时钟及I/O口基本设置 2019-04-21
用c 在mysql上存图片_C 批量保存图片进 mysql 利用MYSQL_BIND插入longblob 2019-04-21
mysql 1045 28000_mysql报关于用户密码1045(28000),几种处理方法 (zhuan) 2019-04-21
solr比mysql的优势_Solr与Elasticsearch的优缺点比较总结和归纳 2019-04-21
华为博士招聘上机考试题目_牛客网-华为-2020届校园招聘上机考试-3 2019-04-21
python中for可以做变量名吗_Python中使用动态变量名的方法 2019-04-21
mysql 日期转换天数_MySQL 日期操作 增减天数、时间转换、时间戳 2019-04-21
java对象去重复_JAVA中List对象去除重复值的方法 2019-04-21