Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/req/req_file.py: 81%

232 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1""" 

2Requirements file parsing 

3""" 

4 

5import logging 

6import optparse 

7import os 

8import re 

9import shlex 

10import urllib.parse 

11from optparse import Values 

12from typing import ( 

13 TYPE_CHECKING, 

14 Any, 

15 Callable, 

16 Dict, 

17 Generator, 

18 Iterable, 

19 List, 

20 Optional, 

21 Tuple, 

22) 

23 

24from pip._internal.cli import cmdoptions 

25from pip._internal.exceptions import InstallationError, RequirementsFileParseError 

26from pip._internal.models.search_scope import SearchScope 

27from pip._internal.network.session import PipSession 

28from pip._internal.network.utils import raise_for_status 

29from pip._internal.utils.encoding import auto_decode 

30from pip._internal.utils.urls import get_url_scheme 

31 

32if TYPE_CHECKING: 

33 # NoReturn introduced in 3.6.2; imported only for type checking to maintain 

34 # pip compatibility with older patch versions of Python 3.6 

35 from typing import NoReturn 

36 

37 from pip._internal.index.package_finder import PackageFinder 

38 

39__all__ = ["parse_requirements"] 

40 

41ReqFileLines = Iterable[Tuple[int, str]] 

42 

43LineParser = Callable[[str], Tuple[str, Values]] 

44 

45SCHEME_RE = re.compile(r"^(http|https|file):", re.I) 

46COMMENT_RE = re.compile(r"(^|\s+)#.*$") 

47 

48# Matches environment variable-style values in '${MY_VARIABLE_1}' with the 

49# variable name consisting of only uppercase letters, digits or the '_' 

50# (underscore). This follows the POSIX standard defined in IEEE Std 1003.1, 

51# 2013 Edition. 

52ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})") 

53 

54SUPPORTED_OPTIONS: List[Callable[..., optparse.Option]] = [ 

55 cmdoptions.index_url, 

56 cmdoptions.extra_index_url, 

57 cmdoptions.no_index, 

58 cmdoptions.constraints, 

59 cmdoptions.requirements, 

60 cmdoptions.editable, 

61 cmdoptions.find_links, 

62 cmdoptions.no_binary, 

63 cmdoptions.only_binary, 

64 cmdoptions.prefer_binary, 

65 cmdoptions.require_hashes, 

66 cmdoptions.pre, 

67 cmdoptions.trusted_host, 

68 cmdoptions.use_new_feature, 

69] 

70 

71# options to be passed to requirements 

72SUPPORTED_OPTIONS_REQ: List[Callable[..., optparse.Option]] = [ 

73 cmdoptions.global_options, 

74 cmdoptions.hash, 

75 cmdoptions.config_settings, 

76] 

77 

78# the 'dest' string values 

79SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ] 

80 

81logger = logging.getLogger(__name__) 

82 

83 

84class ParsedRequirement: 

85 def __init__( 

86 self, 

87 requirement: str, 

88 is_editable: bool, 

89 comes_from: str, 

90 constraint: bool, 

91 options: Optional[Dict[str, Any]] = None, 

92 line_source: Optional[str] = None, 

93 ) -> None: 

94 self.requirement = requirement 

95 self.is_editable = is_editable 

96 self.comes_from = comes_from 

97 self.options = options 

98 self.constraint = constraint 

99 self.line_source = line_source 

100 

101 

102class ParsedLine: 

103 def __init__( 

104 self, 

105 filename: str, 

106 lineno: int, 

107 args: str, 

108 opts: Values, 

109 constraint: bool, 

110 ) -> None: 

111 self.filename = filename 

112 self.lineno = lineno 

113 self.opts = opts 

114 self.constraint = constraint 

115 

116 if args: 

117 self.is_requirement = True 

118 self.is_editable = False 

119 self.requirement = args 

120 elif opts.editables: 

121 self.is_requirement = True 

122 self.is_editable = True 

123 # We don't support multiple -e on one line 

124 self.requirement = opts.editables[0] 

125 else: 

126 self.is_requirement = False 

127 

128 

129def parse_requirements( 

130 filename: str, 

131 session: PipSession, 

132 finder: Optional["PackageFinder"] = None, 

133 options: Optional[optparse.Values] = None, 

134 constraint: bool = False, 

135) -> Generator[ParsedRequirement, None, None]: 

136 """Parse a requirements file and yield ParsedRequirement instances. 

137 

138 :param filename: Path or url of requirements file. 

139 :param session: PipSession instance. 

140 :param finder: Instance of pip.index.PackageFinder. 

141 :param options: cli options. 

142 :param constraint: If true, parsing a constraint file rather than 

143 requirements file. 

144 """ 

145 line_parser = get_line_parser(finder) 

146 parser = RequirementsFileParser(session, line_parser) 

147 

148 for parsed_line in parser.parse(filename, constraint): 

149 parsed_req = handle_line( 

150 parsed_line, options=options, finder=finder, session=session 

151 ) 

152 if parsed_req is not None: 

153 yield parsed_req 

154 

155 

156def preprocess(content: str) -> ReqFileLines: 

157 """Split, filter, and join lines, and return a line iterator 

158 

159 :param content: the content of the requirements file 

160 """ 

161 lines_enum: ReqFileLines = enumerate(content.splitlines(), start=1) 

162 lines_enum = join_lines(lines_enum) 

163 lines_enum = ignore_comments(lines_enum) 

164 lines_enum = expand_env_variables(lines_enum) 

165 return lines_enum 

166 

167 

168def handle_requirement_line( 

169 line: ParsedLine, 

170 options: Optional[optparse.Values] = None, 

171) -> ParsedRequirement: 

172 # preserve for the nested code path 

173 line_comes_from = "{} {} (line {})".format( 

174 "-c" if line.constraint else "-r", 

175 line.filename, 

176 line.lineno, 

177 ) 

178 

179 assert line.is_requirement 

180 

181 if line.is_editable: 

182 # For editable requirements, we don't support per-requirement 

183 # options, so just return the parsed requirement. 

184 return ParsedRequirement( 

185 requirement=line.requirement, 

186 is_editable=line.is_editable, 

187 comes_from=line_comes_from, 

188 constraint=line.constraint, 

189 ) 

190 else: 

191 # get the options that apply to requirements 

192 req_options = {} 

193 for dest in SUPPORTED_OPTIONS_REQ_DEST: 

194 if dest in line.opts.__dict__ and line.opts.__dict__[dest]: 

195 req_options[dest] = line.opts.__dict__[dest] 

196 

197 line_source = f"line {line.lineno} of {line.filename}" 

198 return ParsedRequirement( 

199 requirement=line.requirement, 

200 is_editable=line.is_editable, 

201 comes_from=line_comes_from, 

202 constraint=line.constraint, 

203 options=req_options, 

204 line_source=line_source, 

205 ) 

206 

207 

208def handle_option_line( 

209 opts: Values, 

210 filename: str, 

211 lineno: int, 

212 finder: Optional["PackageFinder"] = None, 

213 options: Optional[optparse.Values] = None, 

214 session: Optional[PipSession] = None, 

215) -> None: 

216 if opts.hashes: 

217 logger.warning( 

218 "%s line %s has --hash but no requirement, and will be ignored.", 

219 filename, 

220 lineno, 

221 ) 

222 

223 if options: 

224 # percolate options upward 

225 if opts.require_hashes: 

226 options.require_hashes = opts.require_hashes 

227 if opts.features_enabled: 

228 options.features_enabled.extend( 

229 f for f in opts.features_enabled if f not in options.features_enabled 

230 ) 

231 

232 # set finder options 

233 if finder: 

234 find_links = finder.find_links 

235 index_urls = finder.index_urls 

236 no_index = finder.search_scope.no_index 

237 if opts.no_index is True: 

238 no_index = True 

239 index_urls = [] 

240 if opts.index_url and not no_index: 

241 index_urls = [opts.index_url] 

242 if opts.extra_index_urls and not no_index: 

243 index_urls.extend(opts.extra_index_urls) 

244 if opts.find_links: 

245 # FIXME: it would be nice to keep track of the source 

246 # of the find_links: support a find-links local path 

247 # relative to a requirements file. 

248 value = opts.find_links[0] 

249 req_dir = os.path.dirname(os.path.abspath(filename)) 

250 relative_to_reqs_file = os.path.join(req_dir, value) 

251 if os.path.exists(relative_to_reqs_file): 

252 value = relative_to_reqs_file 

253 find_links.append(value) 

254 

255 if session: 

256 # We need to update the auth urls in session 

257 session.update_index_urls(index_urls) 

258 

259 search_scope = SearchScope( 

260 find_links=find_links, 

261 index_urls=index_urls, 

262 no_index=no_index, 

263 ) 

264 finder.search_scope = search_scope 

265 

266 if opts.pre: 

267 finder.set_allow_all_prereleases() 

268 

269 if opts.prefer_binary: 

270 finder.set_prefer_binary() 

271 

272 if session: 

273 for host in opts.trusted_hosts or []: 

274 source = f"line {lineno} of {filename}" 

275 session.add_trusted_host(host, source=source) 

276 

277 

278def handle_line( 

279 line: ParsedLine, 

280 options: Optional[optparse.Values] = None, 

281 finder: Optional["PackageFinder"] = None, 

282 session: Optional[PipSession] = None, 

283) -> Optional[ParsedRequirement]: 

284 """Handle a single parsed requirements line; This can result in 

285 creating/yielding requirements, or updating the finder. 

286 

287 :param line: The parsed line to be processed. 

288 :param options: CLI options. 

289 :param finder: The finder - updated by non-requirement lines. 

290 :param session: The session - updated by non-requirement lines. 

291 

292 Returns a ParsedRequirement object if the line is a requirement line, 

293 otherwise returns None. 

294 

295 For lines that contain requirements, the only options that have an effect 

296 are from SUPPORTED_OPTIONS_REQ, and they are scoped to the 

297 requirement. Other options from SUPPORTED_OPTIONS may be present, but are 

298 ignored. 

299 

300 For lines that do not contain requirements, the only options that have an 

301 effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may 

302 be present, but are ignored. These lines may contain multiple options 

303 (although our docs imply only one is supported), and all our parsed and 

304 affect the finder. 

305 """ 

306 

307 if line.is_requirement: 

308 parsed_req = handle_requirement_line(line, options) 

309 return parsed_req 

310 else: 

311 handle_option_line( 

312 line.opts, 

313 line.filename, 

314 line.lineno, 

315 finder, 

316 options, 

317 session, 

318 ) 

319 return None 

320 

321 

322class RequirementsFileParser: 

323 def __init__( 

324 self, 

325 session: PipSession, 

326 line_parser: LineParser, 

327 ) -> None: 

328 self._session = session 

329 self._line_parser = line_parser 

330 

331 def parse( 

332 self, filename: str, constraint: bool 

333 ) -> Generator[ParsedLine, None, None]: 

334 """Parse a given file, yielding parsed lines.""" 

335 yield from self._parse_and_recurse(filename, constraint) 

336 

337 def _parse_and_recurse( 

338 self, filename: str, constraint: bool 

339 ) -> Generator[ParsedLine, None, None]: 

340 for line in self._parse_file(filename, constraint): 

341 if not line.is_requirement and ( 

342 line.opts.requirements or line.opts.constraints 

343 ): 

344 # parse a nested requirements file 

345 if line.opts.requirements: 

346 req_path = line.opts.requirements[0] 

347 nested_constraint = False 

348 else: 

349 req_path = line.opts.constraints[0] 

350 nested_constraint = True 

351 

352 # original file is over http 

353 if SCHEME_RE.search(filename): 

354 # do a url join so relative paths work 

355 req_path = urllib.parse.urljoin(filename, req_path) 

356 # original file and nested file are paths 

357 elif not SCHEME_RE.search(req_path): 

358 # do a join so relative paths work 

359 req_path = os.path.join( 

360 os.path.dirname(filename), 

361 req_path, 

362 ) 

363 

364 yield from self._parse_and_recurse(req_path, nested_constraint) 

365 else: 

366 yield line 

367 

368 def _parse_file( 

369 self, filename: str, constraint: bool 

370 ) -> Generator[ParsedLine, None, None]: 

371 _, content = get_file_content(filename, self._session) 

372 

373 lines_enum = preprocess(content) 

374 

375 for line_number, line in lines_enum: 

376 try: 

377 args_str, opts = self._line_parser(line) 

378 except OptionParsingError as e: 

379 # add offending line 

380 msg = f"Invalid requirement: {line}\n{e.msg}" 

381 raise RequirementsFileParseError(msg) 

382 

383 yield ParsedLine( 

384 filename, 

385 line_number, 

386 args_str, 

387 opts, 

388 constraint, 

389 ) 

390 

391 

392def get_line_parser(finder: Optional["PackageFinder"]) -> LineParser: 

393 def parse_line(line: str) -> Tuple[str, Values]: 

394 # Build new parser for each line since it accumulates appendable 

395 # options. 

396 parser = build_parser() 

397 defaults = parser.get_default_values() 

398 defaults.index_url = None 

399 if finder: 

400 defaults.format_control = finder.format_control 

401 

402 args_str, options_str = break_args_options(line) 

403 

404 try: 

405 options = shlex.split(options_str) 

406 except ValueError as e: 

407 raise OptionParsingError(f"Could not split options: {options_str}") from e 

408 

409 opts, _ = parser.parse_args(options, defaults) 

410 

411 return args_str, opts 

412 

413 return parse_line 

414 

415 

416def break_args_options(line: str) -> Tuple[str, str]: 

417 """Break up the line into an args and options string. We only want to shlex 

418 (and then optparse) the options, not the args. args can contain markers 

419 which are corrupted by shlex. 

420 """ 

421 tokens = line.split(" ") 

422 args = [] 

423 options = tokens[:] 

424 for token in tokens: 

425 if token.startswith("-") or token.startswith("--"): 

426 break 

427 else: 

428 args.append(token) 

429 options.pop(0) 

430 return " ".join(args), " ".join(options) 

431 

432 

433class OptionParsingError(Exception): 

434 def __init__(self, msg: str) -> None: 

435 self.msg = msg 

436 

437 

438def build_parser() -> optparse.OptionParser: 

439 """ 

440 Return a parser for parsing requirement lines 

441 """ 

442 parser = optparse.OptionParser(add_help_option=False) 

443 

444 option_factories = SUPPORTED_OPTIONS + SUPPORTED_OPTIONS_REQ 

445 for option_factory in option_factories: 

446 option = option_factory() 

447 parser.add_option(option) 

448 

449 # By default optparse sys.exits on parsing errors. We want to wrap 

450 # that in our own exception. 

451 def parser_exit(self: Any, msg: str) -> "NoReturn": 

452 raise OptionParsingError(msg) 

453 

454 # NOTE: mypy disallows assigning to a method 

455 # https://github.com/python/mypy/issues/2427 

456 parser.exit = parser_exit # type: ignore 

457 

458 return parser 

459 

460 

461def join_lines(lines_enum: ReqFileLines) -> ReqFileLines: 

462 """Joins a line ending in '\' with the previous line (except when following 

463 comments). The joined line takes on the index of the first line. 

464 """ 

465 primary_line_number = None 

466 new_line: List[str] = [] 

467 for line_number, line in lines_enum: 

468 if not line.endswith("\\") or COMMENT_RE.match(line): 

469 if COMMENT_RE.match(line): 

470 # this ensures comments are always matched later 

471 line = " " + line 

472 if new_line: 

473 new_line.append(line) 

474 assert primary_line_number is not None 

475 yield primary_line_number, "".join(new_line) 

476 new_line = [] 

477 else: 

478 yield line_number, line 

479 else: 

480 if not new_line: 

481 primary_line_number = line_number 

482 new_line.append(line.strip("\\")) 

483 

484 # last line contains \ 

485 if new_line: 

486 assert primary_line_number is not None 

487 yield primary_line_number, "".join(new_line) 

488 

489 # TODO: handle space after '\'. 

490 

491 

492def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines: 

493 """ 

494 Strips comments and filter empty lines. 

495 """ 

496 for line_number, line in lines_enum: 

497 line = COMMENT_RE.sub("", line) 

498 line = line.strip() 

499 if line: 

500 yield line_number, line 

501 

502 

503def expand_env_variables(lines_enum: ReqFileLines) -> ReqFileLines: 

504 """Replace all environment variables that can be retrieved via `os.getenv`. 

505 

506 The only allowed format for environment variables defined in the 

507 requirement file is `${MY_VARIABLE_1}` to ensure two things: 

508 

509 1. Strings that contain a `$` aren't accidentally (partially) expanded. 

510 2. Ensure consistency across platforms for requirement files. 

511 

512 These points are the result of a discussion on the `github pull 

513 request #3514 <https://github.com/pypa/pip/pull/3514>`_. 

514 

515 Valid characters in variable names follow the `POSIX standard 

516 <http://pubs.opengroup.org/onlinepubs/9699919799/>`_ and are limited 

517 to uppercase letter, digits and the `_` (underscore). 

518 """ 

519 for line_number, line in lines_enum: 

520 for env_var, var_name in ENV_VAR_RE.findall(line): 

521 value = os.getenv(var_name) 

522 if not value: 

523 continue 

524 

525 line = line.replace(env_var, value) 

526 

527 yield line_number, line 

528 

529 

530def get_file_content(url: str, session: PipSession) -> Tuple[str, str]: 

531 """Gets the content of a file; it may be a filename, file: URL, or 

532 http: URL. Returns (location, content). Content is unicode. 

533 Respects # -*- coding: declarations on the retrieved files. 

534 

535 :param url: File path or url. 

536 :param session: PipSession instance. 

537 """ 

538 scheme = get_url_scheme(url) 

539 

540 # Pip has special support for file:// URLs (LocalFSAdapter). 

541 if scheme in ["http", "https", "file"]: 

542 resp = session.get(url) 

543 raise_for_status(resp) 

544 return resp.url, resp.text 

545 

546 # Assume this is a bare path. 

547 try: 

548 with open(url, "rb") as f: 

549 content = auto_decode(f.read()) 

550 except OSError as exc: 

551 raise InstallationError(f"Could not open requirements file: {exc}") 

552 return url, content