Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/req/req_file.py: 80%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

276 statements  

1""" 

2Requirements file parsing 

3""" 

4 

5from __future__ import annotations 

6 

7import codecs 

8import locale 

9import logging 

10import optparse 

11import os 

12import re 

13import shlex 

14import sys 

15import urllib.parse 

16from collections.abc import Generator, Iterable 

17from dataclasses import dataclass 

18from optparse import Values 

19from typing import ( 

20 TYPE_CHECKING, 

21 Any, 

22 Callable, 

23 NoReturn, 

24) 

25 

26from pip._internal.cli import cmdoptions 

27from pip._internal.exceptions import InstallationError, RequirementsFileParseError 

28from pip._internal.models.release_control import ReleaseControl 

29from pip._internal.models.search_scope import SearchScope 

30 

31if TYPE_CHECKING: 

32 from pip._internal.index.package_finder import PackageFinder 

33 from pip._internal.network.session import PipSession 

34 

35__all__ = ["parse_requirements"] 

36 

37ReqFileLines = Iterable[tuple[int, str]] 

38 

39LineParser = Callable[[str], tuple[str, Values]] 

40 

41SCHEME_RE = re.compile(r"^(http|https|file):", re.I) 

42COMMENT_RE = re.compile(r"(^|\s+)#.*$") 

43 

44# Matches environment variable-style values in '${MY_VARIABLE_1}' with the 

45# variable name consisting of only uppercase letters, digits or the '_' 

46# (underscore). This follows the POSIX standard defined in IEEE Std 1003.1, 

47# 2013 Edition. 

48ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})") 

49 

50SUPPORTED_OPTIONS: list[Callable[..., optparse.Option]] = [ 

51 cmdoptions.index_url, 

52 cmdoptions.extra_index_url, 

53 cmdoptions.no_index, 

54 cmdoptions.constraints, 

55 cmdoptions.requirements, 

56 cmdoptions.editable, 

57 cmdoptions.find_links, 

58 cmdoptions.no_binary, 

59 cmdoptions.only_binary, 

60 cmdoptions.prefer_binary, 

61 cmdoptions.require_hashes, 

62 cmdoptions.pre, 

63 cmdoptions.all_releases, 

64 cmdoptions.only_final, 

65 cmdoptions.trusted_host, 

66 cmdoptions.use_new_feature, 

67] 

68 

69# options to be passed to requirements 

70SUPPORTED_OPTIONS_REQ: list[Callable[..., optparse.Option]] = [ 

71 cmdoptions.hash, 

72 cmdoptions.config_settings, 

73] 

74 

75SUPPORTED_OPTIONS_EDITABLE_REQ: list[Callable[..., optparse.Option]] = [ 

76 cmdoptions.config_settings, 

77] 

78 

79 

80# the 'dest' string values 

81SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ] 

82SUPPORTED_OPTIONS_EDITABLE_REQ_DEST = [ 

83 str(o().dest) for o in SUPPORTED_OPTIONS_EDITABLE_REQ 

84] 

85 

86# order of BOMS is important: codecs.BOM_UTF16_LE is a prefix of codecs.BOM_UTF32_LE 

87# so data.startswith(BOM_UTF16_LE) would be true for UTF32_LE data 

88BOMS: list[tuple[bytes, str]] = [ 

89 (codecs.BOM_UTF8, "utf-8"), 

90 (codecs.BOM_UTF32, "utf-32"), 

91 (codecs.BOM_UTF32_BE, "utf-32-be"), 

92 (codecs.BOM_UTF32_LE, "utf-32-le"), 

93 (codecs.BOM_UTF16, "utf-16"), 

94 (codecs.BOM_UTF16_BE, "utf-16-be"), 

95 (codecs.BOM_UTF16_LE, "utf-16-le"), 

96] 

97 

98PEP263_ENCODING_RE = re.compile(rb"coding[:=]\s*([-\w.]+)") 

99DEFAULT_ENCODING = "utf-8" 

100 

101logger = logging.getLogger(__name__) 

102 

103 

104@dataclass(frozen=True, slots=True) 

105class ParsedRequirement: 

106 requirement: str 

107 is_editable: bool 

108 comes_from: str 

109 constraint: bool 

110 options: dict[str, Any] | None 

111 line_source: str | None 

112 

113 

114@dataclass(frozen=True, slots=True) 

115class ParsedLine: 

116 filename: str 

117 lineno: int 

118 args: str 

119 opts: Values 

120 constraint: bool 

121 

122 @property 

123 def is_editable(self) -> bool: 

124 return bool(self.opts.editables) 

125 

126 @property 

127 def requirement(self) -> str | None: 

128 if self.args: 

129 return self.args 

130 elif self.is_editable: 

131 # We don't support multiple -e on one line 

132 return self.opts.editables[0] 

133 return None 

134 

135 

136def parse_requirements( 

137 filename: str, 

138 session: PipSession, 

139 finder: PackageFinder | None = None, 

140 options: optparse.Values | None = None, 

141 constraint: bool = False, 

142) -> Generator[ParsedRequirement, None, None]: 

143 """Parse a requirements file and yield ParsedRequirement instances. 

144 

145 :param filename: Path or url of requirements file. 

146 :param session: PipSession instance. 

147 :param finder: Instance of pip.index.PackageFinder. 

148 :param options: cli options. 

149 :param constraint: If true, parsing a constraint file rather than 

150 requirements file. 

151 """ 

152 line_parser = get_line_parser(finder) 

153 parser = RequirementsFileParser(session, line_parser) 

154 

155 for parsed_line in parser.parse(filename, constraint): 

156 parsed_req = handle_line( 

157 parsed_line, options=options, finder=finder, session=session 

158 ) 

159 if parsed_req is not None: 

160 yield parsed_req 

161 

162 

163def preprocess(content: str) -> ReqFileLines: 

164 """Split, filter, and join lines, and return a line iterator 

165 

166 :param content: the content of the requirements file 

167 """ 

168 lines_enum: ReqFileLines = enumerate(content.splitlines(), start=1) 

169 lines_enum = join_lines(lines_enum) 

170 lines_enum = ignore_comments(lines_enum) 

171 lines_enum = expand_env_variables(lines_enum) 

172 return lines_enum 

173 

174 

175def handle_requirement_line( 

176 line: ParsedLine, 

177 options: optparse.Values | None = None, 

178) -> ParsedRequirement: 

179 # preserve for the nested code path 

180 line_comes_from = "{} {} (line {})".format( 

181 "-c" if line.constraint else "-r", 

182 line.filename, 

183 line.lineno, 

184 ) 

185 

186 assert line.requirement is not None 

187 

188 # get the options that apply to requirements 

189 if line.is_editable: 

190 supported_dest = SUPPORTED_OPTIONS_EDITABLE_REQ_DEST 

191 else: 

192 supported_dest = SUPPORTED_OPTIONS_REQ_DEST 

193 req_options = {} 

194 for dest in supported_dest: 

195 if dest in line.opts.__dict__ and line.opts.__dict__[dest]: 

196 req_options[dest] = line.opts.__dict__[dest] 

197 

198 line_source = f"line {line.lineno} of {line.filename}" 

199 return ParsedRequirement( 

200 requirement=line.requirement, 

201 is_editable=line.is_editable, 

202 comes_from=line_comes_from, 

203 constraint=line.constraint, 

204 options=req_options, 

205 line_source=line_source, 

206 ) 

207 

208 

209def handle_option_line( 

210 opts: Values, 

211 filename: str, 

212 lineno: int, 

213 finder: PackageFinder | None = None, 

214 options: optparse.Values | None = None, 

215 session: PipSession | None = None, 

216) -> None: 

217 if opts.hashes: 

218 logger.warning( 

219 "%s line %s has --hash but no requirement, and will be ignored.", 

220 filename, 

221 lineno, 

222 ) 

223 

224 if options: 

225 # percolate options upward 

226 if opts.require_hashes: 

227 options.require_hashes = opts.require_hashes 

228 if opts.features_enabled: 

229 options.features_enabled.extend( 

230 f for f in opts.features_enabled if f not in options.features_enabled 

231 ) 

232 

233 # set finder options 

234 if finder: 

235 find_links = finder.find_links 

236 index_urls = finder.index_urls 

237 no_index = finder.search_scope.no_index 

238 if opts.no_index is True: 

239 no_index = True 

240 index_urls = [] 

241 if opts.index_url and not no_index: 

242 index_urls = [opts.index_url] 

243 if opts.extra_index_urls and not no_index: 

244 index_urls.extend(opts.extra_index_urls) 

245 if opts.find_links: 

246 # FIXME: it would be nice to keep track of the source 

247 # of the find_links: support a find-links local path 

248 # relative to a requirements file. 

249 value = opts.find_links[0] 

250 req_dir = os.path.dirname(os.path.abspath(filename)) 

251 relative_to_reqs_file = os.path.join(req_dir, value) 

252 if os.path.exists(relative_to_reqs_file): 

253 value = relative_to_reqs_file 

254 find_links.append(value) 

255 

256 if session: 

257 # We need to update the auth urls in session 

258 session.update_index_urls(index_urls) 

259 

260 search_scope = SearchScope( 

261 find_links=find_links, 

262 index_urls=index_urls, 

263 no_index=no_index, 

264 ) 

265 finder.search_scope = search_scope 

266 

267 # Transform --pre into --all-releases :all: 

268 if opts.pre: 

269 if not opts.release_control: 

270 opts.release_control = ReleaseControl() 

271 opts.release_control.all_releases.add(":all:") 

272 

273 if opts.release_control: 

274 if not finder.release_control: 

275 # First time seeing release_control, set it on finder 

276 finder.set_release_control(opts.release_control) 

277 

278 if opts.prefer_binary: 

279 finder.set_prefer_binary() 

280 

281 if session: 

282 for host in opts.trusted_hosts or []: 

283 source = f"line {lineno} of {filename}" 

284 session.add_trusted_host(host, source=source) 

285 

286 

287def handle_line( 

288 line: ParsedLine, 

289 options: optparse.Values | None = None, 

290 finder: PackageFinder | None = None, 

291 session: PipSession | None = None, 

292) -> ParsedRequirement | None: 

293 """Handle a single parsed requirements line; This can result in 

294 creating/yielding requirements, or updating the finder. 

295 

296 :param line: The parsed line to be processed. 

297 :param options: CLI options. 

298 :param finder: The finder - updated by non-requirement lines. 

299 :param session: The session - updated by non-requirement lines. 

300 

301 Returns a ParsedRequirement object if the line is a requirement line, 

302 otherwise returns None. 

303 

304 For lines that contain requirements, the only options that have an effect 

305 are from SUPPORTED_OPTIONS_REQ, and they are scoped to the 

306 requirement. Other options from SUPPORTED_OPTIONS may be present, but are 

307 ignored. 

308 

309 For lines that do not contain requirements, the only options that have an 

310 effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may 

311 be present, but are ignored. These lines may contain multiple options 

312 (although our docs imply only one is supported), and all our parsed and 

313 affect the finder. 

314 """ 

315 

316 if line.requirement is not None: 

317 parsed_req = handle_requirement_line(line, options) 

318 return parsed_req 

319 else: 

320 handle_option_line( 

321 line.opts, 

322 line.filename, 

323 line.lineno, 

324 finder, 

325 options, 

326 session, 

327 ) 

328 return None 

329 

330 

331class RequirementsFileParser: 

332 def __init__( 

333 self, 

334 session: PipSession, 

335 line_parser: LineParser, 

336 ) -> None: 

337 self._session = session 

338 self._line_parser = line_parser 

339 

340 def parse( 

341 self, filename: str, constraint: bool 

342 ) -> Generator[ParsedLine, None, None]: 

343 """Parse a given file, yielding parsed lines.""" 

344 yield from self._parse_and_recurse( 

345 filename, constraint, [{os.path.abspath(filename): None}] 

346 ) 

347 

348 def _parse_and_recurse( 

349 self, 

350 filename: str, 

351 constraint: bool, 

352 parsed_files_stack: list[dict[str, str | None]], 

353 ) -> Generator[ParsedLine, None, None]: 

354 for line in self._parse_file(filename, constraint): 

355 if line.requirement is None and ( 

356 line.opts.requirements or line.opts.constraints 

357 ): 

358 # parse a nested requirements file 

359 if line.opts.requirements: 

360 req_path = line.opts.requirements[0] 

361 nested_constraint = False 

362 else: 

363 req_path = line.opts.constraints[0] 

364 nested_constraint = True 

365 

366 # original file is over http 

367 if SCHEME_RE.search(filename): 

368 # do a url join so relative paths work 

369 req_path = urllib.parse.urljoin(filename, req_path) 

370 # original file and nested file are paths 

371 elif not SCHEME_RE.search(req_path): 

372 # do a join so relative paths work 

373 # and then abspath so that we can identify recursive references 

374 req_path = os.path.abspath( 

375 os.path.join( 

376 os.path.dirname(filename), 

377 req_path, 

378 ) 

379 ) 

380 parsed_files = parsed_files_stack[0] 

381 if req_path in parsed_files: 

382 initial_file = parsed_files[req_path] 

383 tail = ( 

384 f" and again in {initial_file}" 

385 if initial_file is not None 

386 else "" 

387 ) 

388 raise RequirementsFileParseError( 

389 f"{req_path} recursively references itself in {filename}{tail}" 

390 ) 

391 # Keeping a track where was each file first included in 

392 new_parsed_files = parsed_files.copy() 

393 new_parsed_files[req_path] = filename 

394 yield from self._parse_and_recurse( 

395 req_path, nested_constraint, [new_parsed_files, *parsed_files_stack] 

396 ) 

397 else: 

398 yield line 

399 

400 def _parse_file( 

401 self, filename: str, constraint: bool 

402 ) -> Generator[ParsedLine, None, None]: 

403 _, content = get_file_content(filename, self._session, constraint=constraint) 

404 

405 lines_enum = preprocess(content) 

406 

407 for line_number, line in lines_enum: 

408 try: 

409 args_str, opts = self._line_parser(line) 

410 except OptionParsingError as e: 

411 # add offending line 

412 msg = f"Invalid requirement: {line}\n{e.msg}" 

413 raise RequirementsFileParseError(msg) 

414 

415 yield ParsedLine( 

416 filename, 

417 line_number, 

418 args_str, 

419 opts, 

420 constraint, 

421 ) 

422 

423 

424def get_line_parser(finder: PackageFinder | None) -> LineParser: 

425 def parse_line(line: str) -> tuple[str, Values]: 

426 # Build new parser for each line since it accumulates appendable 

427 # options. 

428 parser = build_parser() 

429 defaults = parser.get_default_values() 

430 defaults.index_url = None 

431 if finder: 

432 defaults.format_control = finder.format_control 

433 defaults.release_control = finder.release_control 

434 

435 args_str, options_str = break_args_options(line) 

436 

437 try: 

438 options = shlex.split(options_str) 

439 except ValueError as e: 

440 raise OptionParsingError(f"Could not split options: {options_str}") from e 

441 

442 opts, _ = parser.parse_args(options, defaults) 

443 

444 return args_str, opts 

445 

446 return parse_line 

447 

448 

449def break_args_options(line: str) -> tuple[str, str]: 

450 """Break up the line into an args and options string. We only want to shlex 

451 (and then optparse) the options, not the args. args can contain markers 

452 which are corrupted by shlex. 

453 """ 

454 tokens = line.split(" ") 

455 args = [] 

456 options = tokens[:] 

457 for token in tokens: 

458 if token.startswith(("-", "--")): 

459 break 

460 else: 

461 args.append(token) 

462 options.pop(0) 

463 return " ".join(args), " ".join(options) 

464 

465 

466class OptionParsingError(Exception): 

467 def __init__(self, msg: str) -> None: 

468 self.msg = msg 

469 

470 

471def build_parser() -> optparse.OptionParser: 

472 """ 

473 Return a parser for parsing requirement lines 

474 """ 

475 parser = optparse.OptionParser(add_help_option=False) 

476 

477 option_factories = SUPPORTED_OPTIONS + SUPPORTED_OPTIONS_REQ 

478 for option_factory in option_factories: 

479 option = option_factory() 

480 parser.add_option(option) 

481 

482 # By default optparse sys.exits on parsing errors. We want to wrap 

483 # that in our own exception. 

484 def parser_exit(self: Any, msg: str) -> NoReturn: 

485 raise OptionParsingError(msg) 

486 

487 # NOTE: mypy disallows assigning to a method 

488 # https://github.com/python/mypy/issues/2427 

489 parser.exit = parser_exit # type: ignore 

490 

491 return parser 

492 

493 

494def join_lines(lines_enum: ReqFileLines) -> ReqFileLines: 

495 """Joins a line ending in '\' with the previous line (except when following 

496 comments). The joined line takes on the index of the first line. 

497 """ 

498 primary_line_number = None 

499 new_line: list[str] = [] 

500 for line_number, line in lines_enum: 

501 if not line.endswith("\\") or COMMENT_RE.match(line): 

502 if COMMENT_RE.match(line): 

503 # this ensures comments are always matched later 

504 line = " " + line 

505 if new_line: 

506 new_line.append(line) 

507 assert primary_line_number is not None 

508 yield primary_line_number, "".join(new_line) 

509 new_line = [] 

510 else: 

511 yield line_number, line 

512 else: 

513 if not new_line: 

514 primary_line_number = line_number 

515 new_line.append(line.strip("\\")) 

516 

517 # last line contains \ 

518 if new_line: 

519 assert primary_line_number is not None 

520 yield primary_line_number, "".join(new_line) 

521 

522 # TODO: handle space after '\'. 

523 

524 

525def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines: 

526 """ 

527 Strips comments and filter empty lines. 

528 """ 

529 for line_number, line in lines_enum: 

530 line = COMMENT_RE.sub("", line) 

531 line = line.strip() 

532 if line: 

533 yield line_number, line 

534 

535 

536def expand_env_variables(lines_enum: ReqFileLines) -> ReqFileLines: 

537 """Replace all environment variables that can be retrieved via `os.getenv`. 

538 

539 The only allowed format for environment variables defined in the 

540 requirement file is `${MY_VARIABLE_1}` to ensure two things: 

541 

542 1. Strings that contain a `$` aren't accidentally (partially) expanded. 

543 2. Ensure consistency across platforms for requirement files. 

544 

545 These points are the result of a discussion on the `github pull 

546 request #3514 <https://github.com/pypa/pip/pull/3514>`_. 

547 

548 Valid characters in variable names follow the `POSIX standard 

549 <http://pubs.opengroup.org/onlinepubs/9699919799/>`_ and are limited 

550 to uppercase letter, digits and the `_` (underscore). 

551 """ 

552 for line_number, line in lines_enum: 

553 for env_var, var_name in ENV_VAR_RE.findall(line): 

554 value = os.getenv(var_name) 

555 if not value: 

556 continue 

557 

558 line = line.replace(env_var, value) 

559 

560 yield line_number, line 

561 

562 

563def get_file_content( 

564 url: str, session: PipSession, *, constraint: bool = False 

565) -> tuple[str, str]: 

566 """Gets the content of a file; it may be a filename, file: URL, or 

567 http: URL. Returns (location, content). Content is unicode. 

568 Respects # -*- coding: declarations on the retrieved files. 

569 

570 :param url: File path or url. 

571 :param session: PipSession instance. 

572 """ 

573 scheme = urllib.parse.urlsplit(url).scheme 

574 # Pip has special support for file:// URLs (LocalFSAdapter). 

575 if scheme in ["http", "https", "file"]: 

576 # Delay importing heavy network modules until absolutely necessary. 

577 from pip._internal.network.utils import raise_for_status 

578 

579 resp = session.get(url) 

580 raise_for_status(resp) 

581 return resp.url, resp.text 

582 

583 # Assume this is a bare path. 

584 try: 

585 with open(url, "rb") as f: 

586 raw_content = f.read() 

587 except OSError as exc: 

588 kind = "constraint" if constraint else "requirements" 

589 raise InstallationError(f"Could not open {kind} file: {exc}") 

590 

591 content = _decode_req_file(raw_content, url) 

592 

593 return url, content 

594 

595 

596def _decode_req_file(data: bytes, url: str) -> str: 

597 for bom, encoding in BOMS: 

598 if data.startswith(bom): 

599 return data[len(bom) :].decode(encoding) 

600 

601 for line in data.split(b"\n")[:2]: 

602 if line[0:1] == b"#": 

603 result = PEP263_ENCODING_RE.search(line) 

604 if result is not None: 

605 encoding = result.groups()[0].decode("ascii") 

606 return data.decode(encoding) 

607 

608 try: 

609 return data.decode(DEFAULT_ENCODING) 

610 except UnicodeDecodeError: 

611 locale_encoding = locale.getpreferredencoding(False) or sys.getdefaultencoding() 

612 logging.warning( 

613 "unable to decode data from %s with default encoding %s, " 

614 "falling back to encoding from locale: %s. " 

615 "If this is intentional you should specify the encoding with a " 

616 "PEP-263 style comment, e.g. '# -*- coding: %s -*-'", 

617 url, 

618 DEFAULT_ENCODING, 

619 locale_encoding, 

620 locale_encoding, 

621 ) 

622 return data.decode(locale_encoding)