1from __future__ import annotations
2
3import logging
4import os
5import re
6
7from pip._internal.utils.misc import (
8 HiddenText,
9 display_path,
10 is_console_interactive,
11 is_installable_dir,
12 split_auth_from_netloc,
13)
14from pip._internal.utils.subprocess import CommandArgs, make_command
15from pip._internal.vcs.versioncontrol import (
16 AuthInfo,
17 RemoteNotFoundError,
18 RevOptions,
19 VersionControl,
20 vcs,
21)
22
23logger = logging.getLogger(__name__)
24
25_svn_xml_url_re = re.compile('url="([^"]+)"')
26_svn_rev_re = re.compile(r'committed-rev="(\d+)"')
27_svn_info_xml_rev_re = re.compile(r'\s*revision="(\d+)"')
28_svn_info_xml_url_re = re.compile(r"<url>(.*)</url>")
29
30
31class Subversion(VersionControl):
32 name = "svn"
33 dirname = ".svn"
34 repo_name = "checkout"
35 schemes = ("svn+ssh", "svn+http", "svn+https", "svn+svn", "svn+file")
36
37 @classmethod
38 def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
39 return True
40
41 @staticmethod
42 def get_base_rev_args(rev: str) -> list[str]:
43 return ["-r", rev]
44
45 @classmethod
46 def get_revision(cls, location: str) -> str:
47 """
48 Return the maximum revision for all files under a given location
49 """
50 # Note: taken from setuptools.command.egg_info
51 revision = 0
52
53 for base, dirs, _ in os.walk(location):
54 if cls.dirname not in dirs:
55 dirs[:] = []
56 continue # no sense walking uncontrolled subdirs
57 dirs.remove(cls.dirname)
58 entries_fn = os.path.join(base, cls.dirname, "entries")
59 if not os.path.exists(entries_fn):
60 # FIXME: should we warn?
61 continue
62
63 dirurl, localrev = cls._get_svn_url_rev(base)
64
65 if base == location:
66 assert dirurl is not None
67 base = dirurl + "/" # save the root url
68 elif not dirurl or not dirurl.startswith(base):
69 dirs[:] = []
70 continue # not part of the same svn tree, skip it
71 revision = max(revision, localrev)
72 return str(revision)
73
74 @classmethod
75 def get_netloc_and_auth(
76 cls, netloc: str, scheme: str
77 ) -> tuple[str, tuple[str | None, str | None]]:
78 """
79 This override allows the auth information to be passed to svn via the
80 --username and --password options instead of via the URL.
81 """
82 if scheme == "ssh":
83 # The --username and --password options can't be used for
84 # svn+ssh URLs, so keep the auth information in the URL.
85 return super().get_netloc_and_auth(netloc, scheme)
86
87 return split_auth_from_netloc(netloc)
88
89 @classmethod
90 def get_url_rev_and_auth(cls, url: str) -> tuple[str, str | None, AuthInfo]:
91 # hotfix the URL scheme after removing svn+ from svn+ssh:// re-add it
92 url, rev, user_pass = super().get_url_rev_and_auth(url)
93 if url.startswith("ssh://"):
94 url = "svn+" + url
95 return url, rev, user_pass
96
97 @staticmethod
98 def make_rev_args(username: str | None, password: HiddenText | None) -> CommandArgs:
99 extra_args: CommandArgs = []
100 if username:
101 extra_args += ["--username", username]
102 if password:
103 extra_args += ["--password", password]
104
105 return extra_args
106
107 @classmethod
108 def get_remote_url(cls, location: str) -> str:
109 # In cases where the source is in a subdirectory, we have to look up in
110 # the location until we find a valid project root.
111 orig_location = location
112 while not is_installable_dir(location):
113 last_location = location
114 location = os.path.dirname(location)
115 if location == last_location:
116 # We've traversed up to the root of the filesystem without
117 # finding a Python project.
118 logger.warning(
119 "Could not find Python project for directory %s (tried all "
120 "parent directories)",
121 orig_location,
122 )
123 raise RemoteNotFoundError
124
125 url, _rev = cls._get_svn_url_rev(location)
126 if url is None:
127 raise RemoteNotFoundError
128
129 return url
130
131 @classmethod
132 def _get_svn_url_rev(cls, location: str) -> tuple[str | None, int]:
133 from pip._internal.exceptions import InstallationError
134
135 entries_path = os.path.join(location, cls.dirname, "entries")
136 if os.path.exists(entries_path):
137 with open(entries_path) as f:
138 data = f.read()
139 else: # subversion >= 1.7 does not have the 'entries' file
140 data = ""
141
142 url = None
143 if data.startswith(("8", "9", "10")):
144 entries = list(map(str.splitlines, data.split("\n\x0c\n")))
145 del entries[0][0] # get rid of the '8'
146 url = entries[0][3]
147 revs = [int(d[9]) for d in entries if len(d) > 9 and d[9]] + [0]
148 elif data.startswith("<?xml"):
149 match = _svn_xml_url_re.search(data)
150 if not match:
151 raise ValueError(f"Badly formatted data: {data!r}")
152 url = match.group(1) # get repository URL
153 revs = [int(m.group(1)) for m in _svn_rev_re.finditer(data)] + [0]
154 else:
155 try:
156 # subversion >= 1.7
157 # Note that using get_remote_call_options is not necessary here
158 # because `svn info` is being run against a local directory.
159 # We don't need to worry about making sure interactive mode
160 # is being used to prompt for passwords, because passwords
161 # are only potentially needed for remote server requests.
162 xml = cls.run_command(
163 ["info", "--xml", location],
164 show_stdout=False,
165 stdout_only=True,
166 )
167 match = _svn_info_xml_url_re.search(xml)
168 assert match is not None
169 url = match.group(1)
170 revs = [int(m.group(1)) for m in _svn_info_xml_rev_re.finditer(xml)]
171 except InstallationError:
172 url, revs = None, []
173
174 if revs:
175 rev = max(revs)
176 else:
177 rev = 0
178
179 return url, rev
180
181 @classmethod
182 def is_commit_id_equal(cls, dest: str, name: str | None) -> bool:
183 """Always assume the versions don't match"""
184 return False
185
186 def __init__(self, use_interactive: bool | None = None) -> None:
187 if use_interactive is None:
188 use_interactive = is_console_interactive()
189 self.use_interactive = use_interactive
190
191 # This member is used to cache the fetched version of the current
192 # ``svn`` client.
193 # Special value definitions:
194 # None: Not evaluated yet.
195 # Empty tuple: Could not parse version.
196 self._vcs_version: tuple[int, ...] | None = None
197
198 super().__init__()
199
200 def call_vcs_version(self) -> tuple[int, ...]:
201 """Query the version of the currently installed Subversion client.
202
203 :return: A tuple containing the parts of the version information or
204 ``()`` if the version returned from ``svn`` could not be parsed.
205 :raises: BadCommand: If ``svn`` is not installed.
206 """
207 # Example versions:
208 # svn, version 1.10.3 (r1842928)
209 # compiled Feb 25 2019, 14:20:39 on x86_64-apple-darwin17.0.0
210 # svn, version 1.7.14 (r1542130)
211 # compiled Mar 28 2018, 08:49:13 on x86_64-pc-linux-gnu
212 # svn, version 1.12.0-SlikSvn (SlikSvn/1.12.0)
213 # compiled May 28 2019, 13:44:56 on x86_64-microsoft-windows6.2
214 version_prefix = "svn, version "
215 version = self.run_command(["--version"], show_stdout=False, stdout_only=True)
216 if not version.startswith(version_prefix):
217 return ()
218
219 version = version[len(version_prefix) :].split()[0]
220 version_list = version.partition("-")[0].split(".")
221 try:
222 parsed_version = tuple(map(int, version_list))
223 except ValueError:
224 return ()
225
226 return parsed_version
227
228 def get_vcs_version(self) -> tuple[int, ...]:
229 """Return the version of the currently installed Subversion client.
230
231 If the version of the Subversion client has already been queried,
232 a cached value will be used.
233
234 :return: A tuple containing the parts of the version information or
235 ``()`` if the version returned from ``svn`` could not be parsed.
236 :raises: BadCommand: If ``svn`` is not installed.
237 """
238 if self._vcs_version is not None:
239 # Use cached version, if available.
240 # If parsing the version failed previously (empty tuple),
241 # do not attempt to parse it again.
242 return self._vcs_version
243
244 vcs_version = self.call_vcs_version()
245 self._vcs_version = vcs_version
246 return vcs_version
247
248 def get_remote_call_options(self) -> CommandArgs:
249 """Return options to be used on calls to Subversion that contact the server.
250
251 These options are applicable for the following ``svn`` subcommands used
252 in this class.
253
254 - checkout
255 - switch
256 - update
257
258 :return: A list of command line arguments to pass to ``svn``.
259 """
260 if not self.use_interactive:
261 # --non-interactive switch is available since Subversion 0.14.4.
262 # Subversion < 1.8 runs in interactive mode by default.
263 return ["--non-interactive"]
264
265 svn_version = self.get_vcs_version()
266 # By default, Subversion >= 1.8 runs in non-interactive mode if
267 # stdin is not a TTY. Since that is how pip invokes SVN, in
268 # call_subprocess(), pip must pass --force-interactive to ensure
269 # the user can be prompted for a password, if required.
270 # SVN added the --force-interactive option in SVN 1.8. Since
271 # e.g. RHEL/CentOS 7, which is supported until 2024, ships with
272 # SVN 1.7, pip should continue to support SVN 1.7. Therefore, pip
273 # can't safely add the option if the SVN version is < 1.8 (or unknown).
274 if svn_version >= (1, 8):
275 return ["--force-interactive"]
276
277 return []
278
279 def fetch_new(
280 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
281 ) -> None:
282 rev_display = rev_options.to_display()
283 logger.info(
284 "Checking out %s%s to %s",
285 url,
286 rev_display,
287 display_path(dest),
288 )
289 if verbosity <= 0:
290 flags = ["--quiet"]
291 else:
292 flags = []
293 cmd_args = make_command(
294 "checkout",
295 *flags,
296 self.get_remote_call_options(),
297 rev_options.to_args(),
298 url,
299 dest,
300 )
301 self.run_command(cmd_args)
302
303 def switch(
304 self,
305 dest: str,
306 url: HiddenText,
307 rev_options: RevOptions,
308 verbosity: int = 0,
309 ) -> None:
310 cmd_args = make_command(
311 "switch",
312 self.get_remote_call_options(),
313 rev_options.to_args(),
314 url,
315 dest,
316 )
317 self.run_command(cmd_args)
318
319 def update(
320 self,
321 dest: str,
322 url: HiddenText,
323 rev_options: RevOptions,
324 verbosity: int = 0,
325 ) -> None:
326 cmd_args = make_command(
327 "update",
328 self.get_remote_call_options(),
329 rev_options.to_args(),
330 dest,
331 )
332 self.run_command(cmd_args)
333
334
335vcs.register(Subversion)