Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/vcs/subversion.py: 31%

156 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1import logging 

2import os 

3import re 

4from typing import List, Optional, Tuple 

5 

6from pip._internal.utils.misc import ( 

7 HiddenText, 

8 display_path, 

9 is_console_interactive, 

10 is_installable_dir, 

11 split_auth_from_netloc, 

12) 

13from pip._internal.utils.subprocess import CommandArgs, make_command 

14from pip._internal.vcs.versioncontrol import ( 

15 AuthInfo, 

16 RemoteNotFoundError, 

17 RevOptions, 

18 VersionControl, 

19 vcs, 

20) 

21 

22logger = logging.getLogger(__name__) 

23 

24_svn_xml_url_re = re.compile('url="([^"]+)"') 

25_svn_rev_re = re.compile(r'committed-rev="(\d+)"') 

26_svn_info_xml_rev_re = re.compile(r'\s*revision="(\d+)"') 

27_svn_info_xml_url_re = re.compile(r"<url>(.*)</url>") 

28 

29 

30class Subversion(VersionControl): 

31 name = "svn" 

32 dirname = ".svn" 

33 repo_name = "checkout" 

34 schemes = ("svn+ssh", "svn+http", "svn+https", "svn+svn", "svn+file") 

35 

36 @classmethod 

37 def should_add_vcs_url_prefix(cls, remote_url: str) -> bool: 

38 return True 

39 

40 @staticmethod 

41 def get_base_rev_args(rev: str) -> List[str]: 

42 return ["-r", rev] 

43 

44 @classmethod 

45 def get_revision(cls, location: str) -> str: 

46 """ 

47 Return the maximum revision for all files under a given location 

48 """ 

49 # Note: taken from setuptools.command.egg_info 

50 revision = 0 

51 

52 for base, dirs, _ in os.walk(location): 

53 if cls.dirname not in dirs: 

54 dirs[:] = [] 

55 continue # no sense walking uncontrolled subdirs 

56 dirs.remove(cls.dirname) 

57 entries_fn = os.path.join(base, cls.dirname, "entries") 

58 if not os.path.exists(entries_fn): 

59 # FIXME: should we warn? 

60 continue 

61 

62 dirurl, localrev = cls._get_svn_url_rev(base) 

63 

64 if base == location: 

65 assert dirurl is not None 

66 base = dirurl + "/" # save the root url 

67 elif not dirurl or not dirurl.startswith(base): 

68 dirs[:] = [] 

69 continue # not part of the same svn tree, skip it 

70 revision = max(revision, localrev) 

71 return str(revision) 

72 

73 @classmethod 

74 def get_netloc_and_auth( 

75 cls, netloc: str, scheme: str 

76 ) -> Tuple[str, Tuple[Optional[str], Optional[str]]]: 

77 """ 

78 This override allows the auth information to be passed to svn via the 

79 --username and --password options instead of via the URL. 

80 """ 

81 if scheme == "ssh": 

82 # The --username and --password options can't be used for 

83 # svn+ssh URLs, so keep the auth information in the URL. 

84 return super().get_netloc_and_auth(netloc, scheme) 

85 

86 return split_auth_from_netloc(netloc) 

87 

88 @classmethod 

89 def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]: 

90 # hotfix the URL scheme after removing svn+ from svn+ssh:// re-add it 

91 url, rev, user_pass = super().get_url_rev_and_auth(url) 

92 if url.startswith("ssh://"): 

93 url = "svn+" + url 

94 return url, rev, user_pass 

95 

96 @staticmethod 

97 def make_rev_args( 

98 username: Optional[str], password: Optional[HiddenText] 

99 ) -> CommandArgs: 

100 extra_args: CommandArgs = [] 

101 if username: 

102 extra_args += ["--username", username] 

103 if password: 

104 extra_args += ["--password", password] 

105 

106 return extra_args 

107 

108 @classmethod 

109 def get_remote_url(cls, location: str) -> str: 

110 # In cases where the source is in a subdirectory, we have to look up in 

111 # the location until we find a valid project root. 

112 orig_location = location 

113 while not is_installable_dir(location): 

114 last_location = location 

115 location = os.path.dirname(location) 

116 if location == last_location: 

117 # We've traversed up to the root of the filesystem without 

118 # finding a Python project. 

119 logger.warning( 

120 "Could not find Python project for directory %s (tried all " 

121 "parent directories)", 

122 orig_location, 

123 ) 

124 raise RemoteNotFoundError 

125 

126 url, _rev = cls._get_svn_url_rev(location) 

127 if url is None: 

128 raise RemoteNotFoundError 

129 

130 return url 

131 

132 @classmethod 

133 def _get_svn_url_rev(cls, location: str) -> Tuple[Optional[str], int]: 

134 from pip._internal.exceptions import InstallationError 

135 

136 entries_path = os.path.join(location, cls.dirname, "entries") 

137 if os.path.exists(entries_path): 

138 with open(entries_path) as f: 

139 data = f.read() 

140 else: # subversion >= 1.7 does not have the 'entries' file 

141 data = "" 

142 

143 url = None 

144 if data.startswith("8") or data.startswith("9") or data.startswith("10"): 

145 entries = list(map(str.splitlines, data.split("\n\x0c\n"))) 

146 del entries[0][0] # get rid of the '8' 

147 url = entries[0][3] 

148 revs = [int(d[9]) for d in entries if len(d) > 9 and d[9]] + [0] 

149 elif data.startswith("<?xml"): 

150 match = _svn_xml_url_re.search(data) 

151 if not match: 

152 raise ValueError(f"Badly formatted data: {data!r}") 

153 url = match.group(1) # get repository URL 

154 revs = [int(m.group(1)) for m in _svn_rev_re.finditer(data)] + [0] 

155 else: 

156 try: 

157 # subversion >= 1.7 

158 # Note that using get_remote_call_options is not necessary here 

159 # because `svn info` is being run against a local directory. 

160 # We don't need to worry about making sure interactive mode 

161 # is being used to prompt for passwords, because passwords 

162 # are only potentially needed for remote server requests. 

163 xml = cls.run_command( 

164 ["info", "--xml", location], 

165 show_stdout=False, 

166 stdout_only=True, 

167 ) 

168 match = _svn_info_xml_url_re.search(xml) 

169 assert match is not None 

170 url = match.group(1) 

171 revs = [int(m.group(1)) for m in _svn_info_xml_rev_re.finditer(xml)] 

172 except InstallationError: 

173 url, revs = None, [] 

174 

175 if revs: 

176 rev = max(revs) 

177 else: 

178 rev = 0 

179 

180 return url, rev 

181 

182 @classmethod 

183 def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool: 

184 """Always assume the versions don't match""" 

185 return False 

186 

187 def __init__(self, use_interactive: Optional[bool] = None) -> None: 

188 if use_interactive is None: 

189 use_interactive = is_console_interactive() 

190 self.use_interactive = use_interactive 

191 

192 # This member is used to cache the fetched version of the current 

193 # ``svn`` client. 

194 # Special value definitions: 

195 # None: Not evaluated yet. 

196 # Empty tuple: Could not parse version. 

197 self._vcs_version: Optional[Tuple[int, ...]] = None 

198 

199 super().__init__() 

200 

201 def call_vcs_version(self) -> Tuple[int, ...]: 

202 """Query the version of the currently installed Subversion client. 

203 

204 :return: A tuple containing the parts of the version information or 

205 ``()`` if the version returned from ``svn`` could not be parsed. 

206 :raises: BadCommand: If ``svn`` is not installed. 

207 """ 

208 # Example versions: 

209 # svn, version 1.10.3 (r1842928) 

210 # compiled Feb 25 2019, 14:20:39 on x86_64-apple-darwin17.0.0 

211 # svn, version 1.7.14 (r1542130) 

212 # compiled Mar 28 2018, 08:49:13 on x86_64-pc-linux-gnu 

213 # svn, version 1.12.0-SlikSvn (SlikSvn/1.12.0) 

214 # compiled May 28 2019, 13:44:56 on x86_64-microsoft-windows6.2 

215 version_prefix = "svn, version " 

216 version = self.run_command(["--version"], show_stdout=False, stdout_only=True) 

217 if not version.startswith(version_prefix): 

218 return () 

219 

220 version = version[len(version_prefix) :].split()[0] 

221 version_list = version.partition("-")[0].split(".") 

222 try: 

223 parsed_version = tuple(map(int, version_list)) 

224 except ValueError: 

225 return () 

226 

227 return parsed_version 

228 

229 def get_vcs_version(self) -> Tuple[int, ...]: 

230 """Return the version of the currently installed Subversion client. 

231 

232 If the version of the Subversion client has already been queried, 

233 a cached value will be used. 

234 

235 :return: A tuple containing the parts of the version information or 

236 ``()`` if the version returned from ``svn`` could not be parsed. 

237 :raises: BadCommand: If ``svn`` is not installed. 

238 """ 

239 if self._vcs_version is not None: 

240 # Use cached version, if available. 

241 # If parsing the version failed previously (empty tuple), 

242 # do not attempt to parse it again. 

243 return self._vcs_version 

244 

245 vcs_version = self.call_vcs_version() 

246 self._vcs_version = vcs_version 

247 return vcs_version 

248 

249 def get_remote_call_options(self) -> CommandArgs: 

250 """Return options to be used on calls to Subversion that contact the server. 

251 

252 These options are applicable for the following ``svn`` subcommands used 

253 in this class. 

254 

255 - checkout 

256 - switch 

257 - update 

258 

259 :return: A list of command line arguments to pass to ``svn``. 

260 """ 

261 if not self.use_interactive: 

262 # --non-interactive switch is available since Subversion 0.14.4. 

263 # Subversion < 1.8 runs in interactive mode by default. 

264 return ["--non-interactive"] 

265 

266 svn_version = self.get_vcs_version() 

267 # By default, Subversion >= 1.8 runs in non-interactive mode if 

268 # stdin is not a TTY. Since that is how pip invokes SVN, in 

269 # call_subprocess(), pip must pass --force-interactive to ensure 

270 # the user can be prompted for a password, if required. 

271 # SVN added the --force-interactive option in SVN 1.8. Since 

272 # e.g. RHEL/CentOS 7, which is supported until 2024, ships with 

273 # SVN 1.7, pip should continue to support SVN 1.7. Therefore, pip 

274 # can't safely add the option if the SVN version is < 1.8 (or unknown). 

275 if svn_version >= (1, 8): 

276 return ["--force-interactive"] 

277 

278 return [] 

279 

280 def fetch_new( 

281 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int 

282 ) -> None: 

283 rev_display = rev_options.to_display() 

284 logger.info( 

285 "Checking out %s%s to %s", 

286 url, 

287 rev_display, 

288 display_path(dest), 

289 ) 

290 if verbosity <= 0: 

291 flag = "--quiet" 

292 else: 

293 flag = "" 

294 cmd_args = make_command( 

295 "checkout", 

296 flag, 

297 self.get_remote_call_options(), 

298 rev_options.to_args(), 

299 url, 

300 dest, 

301 ) 

302 self.run_command(cmd_args) 

303 

304 def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: 

305 cmd_args = make_command( 

306 "switch", 

307 self.get_remote_call_options(), 

308 rev_options.to_args(), 

309 url, 

310 dest, 

311 ) 

312 self.run_command(cmd_args) 

313 

314 def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: 

315 cmd_args = make_command( 

316 "update", 

317 self.get_remote_call_options(), 

318 rev_options.to_args(), 

319 dest, 

320 ) 

321 self.run_command(cmd_args) 

322 

323 

324vcs.register(Subversion)