Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/vcs/subversion.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

156 statements  

1from __future__ import annotations 

2 

3import logging 

4import os 

5import re 

6 

7from pip._internal.utils.misc import ( 

8 HiddenText, 

9 display_path, 

10 is_console_interactive, 

11 is_installable_dir, 

12 split_auth_from_netloc, 

13) 

14from pip._internal.utils.subprocess import CommandArgs, make_command 

15from pip._internal.vcs.versioncontrol import ( 

16 AuthInfo, 

17 RemoteNotFoundError, 

18 RevOptions, 

19 VersionControl, 

20 vcs, 

21) 

22 

23logger = logging.getLogger(__name__) 

24 

25_svn_xml_url_re = re.compile('url="([^"]+)"') 

26_svn_rev_re = re.compile(r'committed-rev="(\d+)"') 

27_svn_info_xml_rev_re = re.compile(r'\s*revision="(\d+)"') 

28_svn_info_xml_url_re = re.compile(r"<url>(.*)</url>") 

29 

30 

31class Subversion(VersionControl): 

32 name = "svn" 

33 dirname = ".svn" 

34 repo_name = "checkout" 

35 schemes = ("svn+ssh", "svn+http", "svn+https", "svn+svn", "svn+file") 

36 

37 @classmethod 

38 def should_add_vcs_url_prefix(cls, remote_url: str) -> bool: 

39 return True 

40 

41 @staticmethod 

42 def get_base_rev_args(rev: str) -> list[str]: 

43 return ["-r", rev] 

44 

45 @classmethod 

46 def get_revision(cls, location: str) -> str: 

47 """ 

48 Return the maximum revision for all files under a given location 

49 """ 

50 # Note: taken from setuptools.command.egg_info 

51 revision = 0 

52 

53 for base, dirs, _ in os.walk(location): 

54 if cls.dirname not in dirs: 

55 dirs[:] = [] 

56 continue # no sense walking uncontrolled subdirs 

57 dirs.remove(cls.dirname) 

58 entries_fn = os.path.join(base, cls.dirname, "entries") 

59 if not os.path.exists(entries_fn): 

60 # FIXME: should we warn? 

61 continue 

62 

63 dirurl, localrev = cls._get_svn_url_rev(base) 

64 

65 if base == location: 

66 assert dirurl is not None 

67 base = dirurl + "/" # save the root url 

68 elif not dirurl or not dirurl.startswith(base): 

69 dirs[:] = [] 

70 continue # not part of the same svn tree, skip it 

71 revision = max(revision, localrev) 

72 return str(revision) 

73 

74 @classmethod 

75 def get_netloc_and_auth( 

76 cls, netloc: str, scheme: str 

77 ) -> tuple[str, tuple[str | None, str | None]]: 

78 """ 

79 This override allows the auth information to be passed to svn via the 

80 --username and --password options instead of via the URL. 

81 """ 

82 if scheme == "ssh": 

83 # The --username and --password options can't be used for 

84 # svn+ssh URLs, so keep the auth information in the URL. 

85 return super().get_netloc_and_auth(netloc, scheme) 

86 

87 return split_auth_from_netloc(netloc) 

88 

89 @classmethod 

90 def get_url_rev_and_auth(cls, url: str) -> tuple[str, str | None, AuthInfo]: 

91 # hotfix the URL scheme after removing svn+ from svn+ssh:// re-add it 

92 url, rev, user_pass = super().get_url_rev_and_auth(url) 

93 if url.startswith("ssh://"): 

94 url = "svn+" + url 

95 return url, rev, user_pass 

96 

97 @staticmethod 

98 def make_rev_args(username: str | None, password: HiddenText | None) -> CommandArgs: 

99 extra_args: CommandArgs = [] 

100 if username: 

101 extra_args += ["--username", username] 

102 if password: 

103 extra_args += ["--password", password] 

104 

105 return extra_args 

106 

107 @classmethod 

108 def get_remote_url(cls, location: str) -> str: 

109 # In cases where the source is in a subdirectory, we have to look up in 

110 # the location until we find a valid project root. 

111 orig_location = location 

112 while not is_installable_dir(location): 

113 last_location = location 

114 location = os.path.dirname(location) 

115 if location == last_location: 

116 # We've traversed up to the root of the filesystem without 

117 # finding a Python project. 

118 logger.warning( 

119 "Could not find Python project for directory %s (tried all " 

120 "parent directories)", 

121 orig_location, 

122 ) 

123 raise RemoteNotFoundError 

124 

125 url, _rev = cls._get_svn_url_rev(location) 

126 if url is None: 

127 raise RemoteNotFoundError 

128 

129 return url 

130 

131 @classmethod 

132 def _get_svn_url_rev(cls, location: str) -> tuple[str | None, int]: 

133 from pip._internal.exceptions import InstallationError 

134 

135 entries_path = os.path.join(location, cls.dirname, "entries") 

136 if os.path.exists(entries_path): 

137 with open(entries_path) as f: 

138 data = f.read() 

139 else: # subversion >= 1.7 does not have the 'entries' file 

140 data = "" 

141 

142 url = None 

143 if data.startswith(("8", "9", "10")): 

144 entries = list(map(str.splitlines, data.split("\n\x0c\n"))) 

145 del entries[0][0] # get rid of the '8' 

146 url = entries[0][3] 

147 revs = [int(d[9]) for d in entries if len(d) > 9 and d[9]] + [0] 

148 elif data.startswith("<?xml"): 

149 match = _svn_xml_url_re.search(data) 

150 if not match: 

151 raise ValueError(f"Badly formatted data: {data!r}") 

152 url = match.group(1) # get repository URL 

153 revs = [int(m.group(1)) for m in _svn_rev_re.finditer(data)] + [0] 

154 else: 

155 try: 

156 # subversion >= 1.7 

157 # Note that using get_remote_call_options is not necessary here 

158 # because `svn info` is being run against a local directory. 

159 # We don't need to worry about making sure interactive mode 

160 # is being used to prompt for passwords, because passwords 

161 # are only potentially needed for remote server requests. 

162 xml = cls.run_command( 

163 ["info", "--xml", location], 

164 show_stdout=False, 

165 stdout_only=True, 

166 ) 

167 match = _svn_info_xml_url_re.search(xml) 

168 assert match is not None 

169 url = match.group(1) 

170 revs = [int(m.group(1)) for m in _svn_info_xml_rev_re.finditer(xml)] 

171 except InstallationError: 

172 url, revs = None, [] 

173 

174 if revs: 

175 rev = max(revs) 

176 else: 

177 rev = 0 

178 

179 return url, rev 

180 

181 @classmethod 

182 def is_commit_id_equal(cls, dest: str, name: str | None) -> bool: 

183 """Always assume the versions don't match""" 

184 return False 

185 

186 def __init__(self, use_interactive: bool | None = None) -> None: 

187 if use_interactive is None: 

188 use_interactive = is_console_interactive() 

189 self.use_interactive = use_interactive 

190 

191 # This member is used to cache the fetched version of the current 

192 # ``svn`` client. 

193 # Special value definitions: 

194 # None: Not evaluated yet. 

195 # Empty tuple: Could not parse version. 

196 self._vcs_version: tuple[int, ...] | None = None 

197 

198 super().__init__() 

199 

200 def call_vcs_version(self) -> tuple[int, ...]: 

201 """Query the version of the currently installed Subversion client. 

202 

203 :return: A tuple containing the parts of the version information or 

204 ``()`` if the version returned from ``svn`` could not be parsed. 

205 :raises: BadCommand: If ``svn`` is not installed. 

206 """ 

207 # Example versions: 

208 # svn, version 1.10.3 (r1842928) 

209 # compiled Feb 25 2019, 14:20:39 on x86_64-apple-darwin17.0.0 

210 # svn, version 1.7.14 (r1542130) 

211 # compiled Mar 28 2018, 08:49:13 on x86_64-pc-linux-gnu 

212 # svn, version 1.12.0-SlikSvn (SlikSvn/1.12.0) 

213 # compiled May 28 2019, 13:44:56 on x86_64-microsoft-windows6.2 

214 version_prefix = "svn, version " 

215 version = self.run_command(["--version"], show_stdout=False, stdout_only=True) 

216 if not version.startswith(version_prefix): 

217 return () 

218 

219 version = version[len(version_prefix) :].split()[0] 

220 version_list = version.partition("-")[0].split(".") 

221 try: 

222 parsed_version = tuple(map(int, version_list)) 

223 except ValueError: 

224 return () 

225 

226 return parsed_version 

227 

228 def get_vcs_version(self) -> tuple[int, ...]: 

229 """Return the version of the currently installed Subversion client. 

230 

231 If the version of the Subversion client has already been queried, 

232 a cached value will be used. 

233 

234 :return: A tuple containing the parts of the version information or 

235 ``()`` if the version returned from ``svn`` could not be parsed. 

236 :raises: BadCommand: If ``svn`` is not installed. 

237 """ 

238 if self._vcs_version is not None: 

239 # Use cached version, if available. 

240 # If parsing the version failed previously (empty tuple), 

241 # do not attempt to parse it again. 

242 return self._vcs_version 

243 

244 vcs_version = self.call_vcs_version() 

245 self._vcs_version = vcs_version 

246 return vcs_version 

247 

248 def get_remote_call_options(self) -> CommandArgs: 

249 """Return options to be used on calls to Subversion that contact the server. 

250 

251 These options are applicable for the following ``svn`` subcommands used 

252 in this class. 

253 

254 - checkout 

255 - switch 

256 - update 

257 

258 :return: A list of command line arguments to pass to ``svn``. 

259 """ 

260 if not self.use_interactive: 

261 # --non-interactive switch is available since Subversion 0.14.4. 

262 # Subversion < 1.8 runs in interactive mode by default. 

263 return ["--non-interactive"] 

264 

265 svn_version = self.get_vcs_version() 

266 # By default, Subversion >= 1.8 runs in non-interactive mode if 

267 # stdin is not a TTY. Since that is how pip invokes SVN, in 

268 # call_subprocess(), pip must pass --force-interactive to ensure 

269 # the user can be prompted for a password, if required. 

270 # SVN added the --force-interactive option in SVN 1.8. Since 

271 # e.g. RHEL/CentOS 7, which is supported until 2024, ships with 

272 # SVN 1.7, pip should continue to support SVN 1.7. Therefore, pip 

273 # can't safely add the option if the SVN version is < 1.8 (or unknown). 

274 if svn_version >= (1, 8): 

275 return ["--force-interactive"] 

276 

277 return [] 

278 

279 def fetch_new( 

280 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int 

281 ) -> None: 

282 rev_display = rev_options.to_display() 

283 logger.info( 

284 "Checking out %s%s to %s", 

285 url, 

286 rev_display, 

287 display_path(dest), 

288 ) 

289 if verbosity <= 0: 

290 flags = ["--quiet"] 

291 else: 

292 flags = [] 

293 cmd_args = make_command( 

294 "checkout", 

295 *flags, 

296 self.get_remote_call_options(), 

297 rev_options.to_args(), 

298 url, 

299 dest, 

300 ) 

301 self.run_command(cmd_args) 

302 

303 def switch( 

304 self, 

305 dest: str, 

306 url: HiddenText, 

307 rev_options: RevOptions, 

308 verbosity: int = 0, 

309 ) -> None: 

310 cmd_args = make_command( 

311 "switch", 

312 self.get_remote_call_options(), 

313 rev_options.to_args(), 

314 url, 

315 dest, 

316 ) 

317 self.run_command(cmd_args) 

318 

319 def update( 

320 self, 

321 dest: str, 

322 url: HiddenText, 

323 rev_options: RevOptions, 

324 verbosity: int = 0, 

325 ) -> None: 

326 cmd_args = make_command( 

327 "update", 

328 self.get_remote_call_options(), 

329 rev_options.to_args(), 

330 dest, 

331 ) 

332 self.run_command(cmd_args) 

333 

334 

335vcs.register(Subversion)