1from __future__ import annotations
2
3import configparser
4import logging
5import os
6
7from pip._internal.exceptions import BadCommand, InstallationError
8from pip._internal.utils.misc import HiddenText, display_path
9from pip._internal.utils.subprocess import make_command
10from pip._internal.utils.urls import path_to_url
11from pip._internal.vcs.versioncontrol import (
12 RevOptions,
13 VersionControl,
14 find_path_to_project_root_from_repo_root,
15 vcs,
16)
17
18logger = logging.getLogger(__name__)
19
20
21class Mercurial(VersionControl):
22 name = "hg"
23 dirname = ".hg"
24 repo_name = "clone"
25 schemes = (
26 "hg+file",
27 "hg+http",
28 "hg+https",
29 "hg+ssh",
30 "hg+static-http",
31 )
32
33 @staticmethod
34 def get_base_rev_args(rev: str) -> list[str]:
35 return [f"--rev={rev}"]
36
37 def fetch_new(
38 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
39 ) -> None:
40 rev_display = rev_options.to_display()
41 logger.info(
42 "Cloning hg %s%s to %s",
43 url,
44 rev_display,
45 display_path(dest),
46 )
47 if verbosity <= 0:
48 flags: tuple[str, ...] = ("--quiet",)
49 elif verbosity == 1:
50 flags = ()
51 elif verbosity == 2:
52 flags = ("--verbose",)
53 else:
54 flags = ("--verbose", "--debug")
55 self.run_command(make_command("clone", "--noupdate", *flags, url, dest))
56 self.run_command(
57 make_command("update", *flags, rev_options.to_args()),
58 cwd=dest,
59 )
60
61 def switch(
62 self,
63 dest: str,
64 url: HiddenText,
65 rev_options: RevOptions,
66 verbosity: int = 0,
67 ) -> None:
68 extra_flags = []
69 repo_config = os.path.join(dest, self.dirname, "hgrc")
70 config = configparser.RawConfigParser()
71
72 if verbosity <= 0:
73 extra_flags.append("-q")
74
75 try:
76 config.read(repo_config)
77 config.set("paths", "default", url.secret)
78 with open(repo_config, "w") as config_file:
79 config.write(config_file)
80 except (OSError, configparser.NoSectionError) as exc:
81 logger.warning("Could not switch Mercurial repository to %s: %s", url, exc)
82 else:
83 cmd_args = make_command("update", *extra_flags, rev_options.to_args())
84 self.run_command(cmd_args, cwd=dest)
85
86 def update(
87 self,
88 dest: str,
89 url: HiddenText,
90 rev_options: RevOptions,
91 verbosity: int = 0,
92 ) -> None:
93 extra_flags = []
94
95 if verbosity <= 0:
96 extra_flags.append("-q")
97
98 self.run_command(["pull", *extra_flags], cwd=dest)
99 cmd_args = make_command("update", *extra_flags, rev_options.to_args())
100 self.run_command(cmd_args, cwd=dest)
101
102 @classmethod
103 def get_remote_url(cls, location: str) -> str:
104 url = cls.run_command(
105 ["showconfig", "paths.default"],
106 show_stdout=False,
107 stdout_only=True,
108 cwd=location,
109 ).strip()
110 if cls._is_local_repository(url):
111 url = path_to_url(url)
112 return url.strip()
113
114 @classmethod
115 def get_revision(cls, location: str) -> str:
116 """
117 Return the repository-local changeset revision number, as an integer.
118 """
119 current_revision = cls.run_command(
120 ["parents", "--template={rev}"],
121 show_stdout=False,
122 stdout_only=True,
123 cwd=location,
124 ).strip()
125 return current_revision
126
127 @classmethod
128 def get_requirement_revision(cls, location: str) -> str:
129 """
130 Return the changeset identification hash, as a 40-character
131 hexadecimal string
132 """
133 current_rev_hash = cls.run_command(
134 ["parents", "--template={node}"],
135 show_stdout=False,
136 stdout_only=True,
137 cwd=location,
138 ).strip()
139 return current_rev_hash
140
141 @classmethod
142 def is_commit_id_equal(cls, dest: str, name: str | None) -> bool:
143 """Always assume the versions don't match"""
144 return False
145
146 @classmethod
147 def get_subdirectory(cls, location: str) -> str | None:
148 """
149 Return the path to Python project root, relative to the repo root.
150 Return None if the project root is in the repo root.
151 """
152 # find the repo root
153 repo_root = cls.run_command(
154 ["root"], show_stdout=False, stdout_only=True, cwd=location
155 ).strip()
156 if not os.path.isabs(repo_root):
157 repo_root = os.path.abspath(os.path.join(location, repo_root))
158 return find_path_to_project_root_from_repo_root(location, repo_root)
159
160 @classmethod
161 def get_repository_root(cls, location: str) -> str | None:
162 loc = super().get_repository_root(location)
163 if loc:
164 return loc
165 try:
166 r = cls.run_command(
167 ["root"],
168 cwd=location,
169 show_stdout=False,
170 stdout_only=True,
171 on_returncode="raise",
172 log_failed_cmd=False,
173 )
174 except BadCommand:
175 logger.debug(
176 "could not determine if %s is under hg control "
177 "because hg is not available",
178 location,
179 )
180 return None
181 except InstallationError:
182 return None
183 return os.path.normpath(r.rstrip("\r\n"))
184
185
186vcs.register(Mercurial)