Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/req/req_file.py: 81%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

278 statements  

1""" 

2Requirements file parsing 

3""" 

4 

5from __future__ import annotations 

6 

7import codecs 

8import locale 

9import logging 

10import optparse 

11import os 

12import re 

13import shlex 

14import sys 

15import urllib.parse 

16from collections.abc import Generator, Iterable 

17from dataclasses import dataclass 

18from optparse import Values 

19from typing import ( 

20 TYPE_CHECKING, 

21 Any, 

22 Callable, 

23 NoReturn, 

24) 

25 

26from pip._internal.cli import cmdoptions 

27from pip._internal.exceptions import InstallationError, RequirementsFileParseError 

28from pip._internal.models.release_control import ReleaseControl 

29from pip._internal.models.search_scope import SearchScope 

30 

31if TYPE_CHECKING: 

32 from pip._internal.index.package_finder import PackageFinder 

33 from pip._internal.network.session import PipSession 

34 

35__all__ = ["parse_requirements"] 

36 

37ReqFileLines = Iterable[tuple[int, str]] 

38 

39LineParser = Callable[[str], tuple[str, Values]] 

40 

41SCHEME_RE = re.compile(r"^(http|https|file):", re.I) 

42COMMENT_RE = re.compile(r"(^|\s+)#.*$") 

43 

44# Matches environment variable-style values in '${MY_VARIABLE_1}' with the 

45# variable name consisting of only uppercase letters, digits or the '_' 

46# (underscore). This follows the POSIX standard defined in IEEE Std 1003.1, 

47# 2013 Edition. 

48ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})") 

49 

50SUPPORTED_OPTIONS: list[Callable[..., optparse.Option]] = [ 

51 cmdoptions.index_url, 

52 cmdoptions.extra_index_url, 

53 cmdoptions.no_index, 

54 cmdoptions.constraints, 

55 cmdoptions.requirements, 

56 cmdoptions.editable, 

57 cmdoptions.find_links, 

58 cmdoptions.no_binary, 

59 cmdoptions.only_binary, 

60 cmdoptions.prefer_binary, 

61 cmdoptions.require_hashes, 

62 cmdoptions.pre, 

63 cmdoptions.all_releases, 

64 cmdoptions.only_final, 

65 cmdoptions.trusted_host, 

66 cmdoptions.use_new_feature, 

67] 

68 

69# options to be passed to requirements 

70SUPPORTED_OPTIONS_REQ: list[Callable[..., optparse.Option]] = [ 

71 cmdoptions.hash, 

72 cmdoptions.config_settings, 

73] 

74 

75SUPPORTED_OPTIONS_EDITABLE_REQ: list[Callable[..., optparse.Option]] = [ 

76 cmdoptions.config_settings, 

77] 

78 

79 

80# the 'dest' string values 

81SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ] 

82SUPPORTED_OPTIONS_EDITABLE_REQ_DEST = [ 

83 str(o().dest) for o in SUPPORTED_OPTIONS_EDITABLE_REQ 

84] 

85 

86# order of BOMS is important: codecs.BOM_UTF16_LE is a prefix of codecs.BOM_UTF32_LE 

87# so data.startswith(BOM_UTF16_LE) would be true for UTF32_LE data 

88BOMS: list[tuple[bytes, str]] = [ 

89 (codecs.BOM_UTF8, "utf-8"), 

90 (codecs.BOM_UTF32, "utf-32"), 

91 (codecs.BOM_UTF32_BE, "utf-32-be"), 

92 (codecs.BOM_UTF32_LE, "utf-32-le"), 

93 (codecs.BOM_UTF16, "utf-16"), 

94 (codecs.BOM_UTF16_BE, "utf-16-be"), 

95 (codecs.BOM_UTF16_LE, "utf-16-le"), 

96] 

97 

98PEP263_ENCODING_RE = re.compile(rb"coding[:=]\s*([-\w.]+)") 

99DEFAULT_ENCODING = "utf-8" 

100 

101logger = logging.getLogger(__name__) 

102 

103 

104@dataclass(frozen=True) 

105class ParsedRequirement: 

106 # TODO: replace this with slots=True when dropping Python 3.9 support. 

107 __slots__ = ( 

108 "requirement", 

109 "is_editable", 

110 "comes_from", 

111 "constraint", 

112 "options", 

113 "line_source", 

114 ) 

115 

116 requirement: str 

117 is_editable: bool 

118 comes_from: str 

119 constraint: bool 

120 options: dict[str, Any] | None 

121 line_source: str | None 

122 

123 

124@dataclass(frozen=True) 

125class ParsedLine: 

126 __slots__ = ("filename", "lineno", "args", "opts", "constraint") 

127 

128 filename: str 

129 lineno: int 

130 args: str 

131 opts: Values 

132 constraint: bool 

133 

134 @property 

135 def is_editable(self) -> bool: 

136 return bool(self.opts.editables) 

137 

138 @property 

139 def requirement(self) -> str | None: 

140 if self.args: 

141 return self.args 

142 elif self.is_editable: 

143 # We don't support multiple -e on one line 

144 return self.opts.editables[0] 

145 return None 

146 

147 

148def parse_requirements( 

149 filename: str, 

150 session: PipSession, 

151 finder: PackageFinder | None = None, 

152 options: optparse.Values | None = None, 

153 constraint: bool = False, 

154) -> Generator[ParsedRequirement, None, None]: 

155 """Parse a requirements file and yield ParsedRequirement instances. 

156 

157 :param filename: Path or url of requirements file. 

158 :param session: PipSession instance. 

159 :param finder: Instance of pip.index.PackageFinder. 

160 :param options: cli options. 

161 :param constraint: If true, parsing a constraint file rather than 

162 requirements file. 

163 """ 

164 line_parser = get_line_parser(finder) 

165 parser = RequirementsFileParser(session, line_parser) 

166 

167 for parsed_line in parser.parse(filename, constraint): 

168 parsed_req = handle_line( 

169 parsed_line, options=options, finder=finder, session=session 

170 ) 

171 if parsed_req is not None: 

172 yield parsed_req 

173 

174 

175def preprocess(content: str) -> ReqFileLines: 

176 """Split, filter, and join lines, and return a line iterator 

177 

178 :param content: the content of the requirements file 

179 """ 

180 lines_enum: ReqFileLines = enumerate(content.splitlines(), start=1) 

181 lines_enum = join_lines(lines_enum) 

182 lines_enum = ignore_comments(lines_enum) 

183 lines_enum = expand_env_variables(lines_enum) 

184 return lines_enum 

185 

186 

187def handle_requirement_line( 

188 line: ParsedLine, 

189 options: optparse.Values | None = None, 

190) -> ParsedRequirement: 

191 # preserve for the nested code path 

192 line_comes_from = "{} {} (line {})".format( 

193 "-c" if line.constraint else "-r", 

194 line.filename, 

195 line.lineno, 

196 ) 

197 

198 assert line.requirement is not None 

199 

200 # get the options that apply to requirements 

201 if line.is_editable: 

202 supported_dest = SUPPORTED_OPTIONS_EDITABLE_REQ_DEST 

203 else: 

204 supported_dest = SUPPORTED_OPTIONS_REQ_DEST 

205 req_options = {} 

206 for dest in supported_dest: 

207 if dest in line.opts.__dict__ and line.opts.__dict__[dest]: 

208 req_options[dest] = line.opts.__dict__[dest] 

209 

210 line_source = f"line {line.lineno} of {line.filename}" 

211 return ParsedRequirement( 

212 requirement=line.requirement, 

213 is_editable=line.is_editable, 

214 comes_from=line_comes_from, 

215 constraint=line.constraint, 

216 options=req_options, 

217 line_source=line_source, 

218 ) 

219 

220 

221def handle_option_line( 

222 opts: Values, 

223 filename: str, 

224 lineno: int, 

225 finder: PackageFinder | None = None, 

226 options: optparse.Values | None = None, 

227 session: PipSession | None = None, 

228) -> None: 

229 if opts.hashes: 

230 logger.warning( 

231 "%s line %s has --hash but no requirement, and will be ignored.", 

232 filename, 

233 lineno, 

234 ) 

235 

236 if options: 

237 # percolate options upward 

238 if opts.require_hashes: 

239 options.require_hashes = opts.require_hashes 

240 if opts.features_enabled: 

241 options.features_enabled.extend( 

242 f for f in opts.features_enabled if f not in options.features_enabled 

243 ) 

244 

245 # set finder options 

246 if finder: 

247 find_links = finder.find_links 

248 index_urls = finder.index_urls 

249 no_index = finder.search_scope.no_index 

250 if opts.no_index is True: 

251 no_index = True 

252 index_urls = [] 

253 if opts.index_url and not no_index: 

254 index_urls = [opts.index_url] 

255 if opts.extra_index_urls and not no_index: 

256 index_urls.extend(opts.extra_index_urls) 

257 if opts.find_links: 

258 # FIXME: it would be nice to keep track of the source 

259 # of the find_links: support a find-links local path 

260 # relative to a requirements file. 

261 value = opts.find_links[0] 

262 req_dir = os.path.dirname(os.path.abspath(filename)) 

263 relative_to_reqs_file = os.path.join(req_dir, value) 

264 if os.path.exists(relative_to_reqs_file): 

265 value = relative_to_reqs_file 

266 find_links.append(value) 

267 

268 if session: 

269 # We need to update the auth urls in session 

270 session.update_index_urls(index_urls) 

271 

272 search_scope = SearchScope( 

273 find_links=find_links, 

274 index_urls=index_urls, 

275 no_index=no_index, 

276 ) 

277 finder.search_scope = search_scope 

278 

279 # Transform --pre into --all-releases :all: 

280 if opts.pre: 

281 if not opts.release_control: 

282 opts.release_control = ReleaseControl() 

283 opts.release_control.all_releases.add(":all:") 

284 

285 if opts.release_control: 

286 if not finder.release_control: 

287 # First time seeing release_control, set it on finder 

288 finder.set_release_control(opts.release_control) 

289 

290 if opts.prefer_binary: 

291 finder.set_prefer_binary() 

292 

293 if session: 

294 for host in opts.trusted_hosts or []: 

295 source = f"line {lineno} of {filename}" 

296 session.add_trusted_host(host, source=source) 

297 

298 

299def handle_line( 

300 line: ParsedLine, 

301 options: optparse.Values | None = None, 

302 finder: PackageFinder | None = None, 

303 session: PipSession | None = None, 

304) -> ParsedRequirement | None: 

305 """Handle a single parsed requirements line; This can result in 

306 creating/yielding requirements, or updating the finder. 

307 

308 :param line: The parsed line to be processed. 

309 :param options: CLI options. 

310 :param finder: The finder - updated by non-requirement lines. 

311 :param session: The session - updated by non-requirement lines. 

312 

313 Returns a ParsedRequirement object if the line is a requirement line, 

314 otherwise returns None. 

315 

316 For lines that contain requirements, the only options that have an effect 

317 are from SUPPORTED_OPTIONS_REQ, and they are scoped to the 

318 requirement. Other options from SUPPORTED_OPTIONS may be present, but are 

319 ignored. 

320 

321 For lines that do not contain requirements, the only options that have an 

322 effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may 

323 be present, but are ignored. These lines may contain multiple options 

324 (although our docs imply only one is supported), and all our parsed and 

325 affect the finder. 

326 """ 

327 

328 if line.requirement is not None: 

329 parsed_req = handle_requirement_line(line, options) 

330 return parsed_req 

331 else: 

332 handle_option_line( 

333 line.opts, 

334 line.filename, 

335 line.lineno, 

336 finder, 

337 options, 

338 session, 

339 ) 

340 return None 

341 

342 

343class RequirementsFileParser: 

344 def __init__( 

345 self, 

346 session: PipSession, 

347 line_parser: LineParser, 

348 ) -> None: 

349 self._session = session 

350 self._line_parser = line_parser 

351 

352 def parse( 

353 self, filename: str, constraint: bool 

354 ) -> Generator[ParsedLine, None, None]: 

355 """Parse a given file, yielding parsed lines.""" 

356 yield from self._parse_and_recurse( 

357 filename, constraint, [{os.path.abspath(filename): None}] 

358 ) 

359 

360 def _parse_and_recurse( 

361 self, 

362 filename: str, 

363 constraint: bool, 

364 parsed_files_stack: list[dict[str, str | None]], 

365 ) -> Generator[ParsedLine, None, None]: 

366 for line in self._parse_file(filename, constraint): 

367 if line.requirement is None and ( 

368 line.opts.requirements or line.opts.constraints 

369 ): 

370 # parse a nested requirements file 

371 if line.opts.requirements: 

372 req_path = line.opts.requirements[0] 

373 nested_constraint = False 

374 else: 

375 req_path = line.opts.constraints[0] 

376 nested_constraint = True 

377 

378 # original file is over http 

379 if SCHEME_RE.search(filename): 

380 # do a url join so relative paths work 

381 req_path = urllib.parse.urljoin(filename, req_path) 

382 # original file and nested file are paths 

383 elif not SCHEME_RE.search(req_path): 

384 # do a join so relative paths work 

385 # and then abspath so that we can identify recursive references 

386 req_path = os.path.abspath( 

387 os.path.join( 

388 os.path.dirname(filename), 

389 req_path, 

390 ) 

391 ) 

392 parsed_files = parsed_files_stack[0] 

393 if req_path in parsed_files: 

394 initial_file = parsed_files[req_path] 

395 tail = ( 

396 f" and again in {initial_file}" 

397 if initial_file is not None 

398 else "" 

399 ) 

400 raise RequirementsFileParseError( 

401 f"{req_path} recursively references itself in {filename}{tail}" 

402 ) 

403 # Keeping a track where was each file first included in 

404 new_parsed_files = parsed_files.copy() 

405 new_parsed_files[req_path] = filename 

406 yield from self._parse_and_recurse( 

407 req_path, nested_constraint, [new_parsed_files, *parsed_files_stack] 

408 ) 

409 else: 

410 yield line 

411 

412 def _parse_file( 

413 self, filename: str, constraint: bool 

414 ) -> Generator[ParsedLine, None, None]: 

415 _, content = get_file_content(filename, self._session, constraint=constraint) 

416 

417 lines_enum = preprocess(content) 

418 

419 for line_number, line in lines_enum: 

420 try: 

421 args_str, opts = self._line_parser(line) 

422 except OptionParsingError as e: 

423 # add offending line 

424 msg = f"Invalid requirement: {line}\n{e.msg}" 

425 raise RequirementsFileParseError(msg) 

426 

427 yield ParsedLine( 

428 filename, 

429 line_number, 

430 args_str, 

431 opts, 

432 constraint, 

433 ) 

434 

435 

436def get_line_parser(finder: PackageFinder | None) -> LineParser: 

437 def parse_line(line: str) -> tuple[str, Values]: 

438 # Build new parser for each line since it accumulates appendable 

439 # options. 

440 parser = build_parser() 

441 defaults = parser.get_default_values() 

442 defaults.index_url = None 

443 if finder: 

444 defaults.format_control = finder.format_control 

445 defaults.release_control = finder.release_control 

446 

447 args_str, options_str = break_args_options(line) 

448 

449 try: 

450 options = shlex.split(options_str) 

451 except ValueError as e: 

452 raise OptionParsingError(f"Could not split options: {options_str}") from e 

453 

454 opts, _ = parser.parse_args(options, defaults) 

455 

456 return args_str, opts 

457 

458 return parse_line 

459 

460 

461def break_args_options(line: str) -> tuple[str, str]: 

462 """Break up the line into an args and options string. We only want to shlex 

463 (and then optparse) the options, not the args. args can contain markers 

464 which are corrupted by shlex. 

465 """ 

466 tokens = line.split(" ") 

467 args = [] 

468 options = tokens[:] 

469 for token in tokens: 

470 if token.startswith(("-", "--")): 

471 break 

472 else: 

473 args.append(token) 

474 options.pop(0) 

475 return " ".join(args), " ".join(options) 

476 

477 

478class OptionParsingError(Exception): 

479 def __init__(self, msg: str) -> None: 

480 self.msg = msg 

481 

482 

483def build_parser() -> optparse.OptionParser: 

484 """ 

485 Return a parser for parsing requirement lines 

486 """ 

487 parser = optparse.OptionParser(add_help_option=False) 

488 

489 option_factories = SUPPORTED_OPTIONS + SUPPORTED_OPTIONS_REQ 

490 for option_factory in option_factories: 

491 option = option_factory() 

492 parser.add_option(option) 

493 

494 # By default optparse sys.exits on parsing errors. We want to wrap 

495 # that in our own exception. 

496 def parser_exit(self: Any, msg: str) -> NoReturn: 

497 raise OptionParsingError(msg) 

498 

499 # NOTE: mypy disallows assigning to a method 

500 # https://github.com/python/mypy/issues/2427 

501 parser.exit = parser_exit # type: ignore 

502 

503 return parser 

504 

505 

506def join_lines(lines_enum: ReqFileLines) -> ReqFileLines: 

507 """Joins a line ending in '\' with the previous line (except when following 

508 comments). The joined line takes on the index of the first line. 

509 """ 

510 primary_line_number = None 

511 new_line: list[str] = [] 

512 for line_number, line in lines_enum: 

513 if not line.endswith("\\") or COMMENT_RE.match(line): 

514 if COMMENT_RE.match(line): 

515 # this ensures comments are always matched later 

516 line = " " + line 

517 if new_line: 

518 new_line.append(line) 

519 assert primary_line_number is not None 

520 yield primary_line_number, "".join(new_line) 

521 new_line = [] 

522 else: 

523 yield line_number, line 

524 else: 

525 if not new_line: 

526 primary_line_number = line_number 

527 new_line.append(line.strip("\\")) 

528 

529 # last line contains \ 

530 if new_line: 

531 assert primary_line_number is not None 

532 yield primary_line_number, "".join(new_line) 

533 

534 # TODO: handle space after '\'. 

535 

536 

537def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines: 

538 """ 

539 Strips comments and filter empty lines. 

540 """ 

541 for line_number, line in lines_enum: 

542 line = COMMENT_RE.sub("", line) 

543 line = line.strip() 

544 if line: 

545 yield line_number, line 

546 

547 

548def expand_env_variables(lines_enum: ReqFileLines) -> ReqFileLines: 

549 """Replace all environment variables that can be retrieved via `os.getenv`. 

550 

551 The only allowed format for environment variables defined in the 

552 requirement file is `${MY_VARIABLE_1}` to ensure two things: 

553 

554 1. Strings that contain a `$` aren't accidentally (partially) expanded. 

555 2. Ensure consistency across platforms for requirement files. 

556 

557 These points are the result of a discussion on the `github pull 

558 request #3514 <https://github.com/pypa/pip/pull/3514>`_. 

559 

560 Valid characters in variable names follow the `POSIX standard 

561 <http://pubs.opengroup.org/onlinepubs/9699919799/>`_ and are limited 

562 to uppercase letter, digits and the `_` (underscore). 

563 """ 

564 for line_number, line in lines_enum: 

565 for env_var, var_name in ENV_VAR_RE.findall(line): 

566 value = os.getenv(var_name) 

567 if not value: 

568 continue 

569 

570 line = line.replace(env_var, value) 

571 

572 yield line_number, line 

573 

574 

575def get_file_content( 

576 url: str, session: PipSession, *, constraint: bool = False 

577) -> tuple[str, str]: 

578 """Gets the content of a file; it may be a filename, file: URL, or 

579 http: URL. Returns (location, content). Content is unicode. 

580 Respects # -*- coding: declarations on the retrieved files. 

581 

582 :param url: File path or url. 

583 :param session: PipSession instance. 

584 """ 

585 scheme = urllib.parse.urlsplit(url).scheme 

586 # Pip has special support for file:// URLs (LocalFSAdapter). 

587 if scheme in ["http", "https", "file"]: 

588 # Delay importing heavy network modules until absolutely necessary. 

589 from pip._internal.network.utils import raise_for_status 

590 

591 resp = session.get(url) 

592 raise_for_status(resp) 

593 return resp.url, resp.text 

594 

595 # Assume this is a bare path. 

596 try: 

597 with open(url, "rb") as f: 

598 raw_content = f.read() 

599 except OSError as exc: 

600 kind = "constraint" if constraint else "requirements" 

601 raise InstallationError(f"Could not open {kind} file: {exc}") 

602 

603 content = _decode_req_file(raw_content, url) 

604 

605 return url, content 

606 

607 

608def _decode_req_file(data: bytes, url: str) -> str: 

609 for bom, encoding in BOMS: 

610 if data.startswith(bom): 

611 return data[len(bom) :].decode(encoding) 

612 

613 for line in data.split(b"\n")[:2]: 

614 if line[0:1] == b"#": 

615 result = PEP263_ENCODING_RE.search(line) 

616 if result is not None: 

617 encoding = result.groups()[0].decode("ascii") 

618 return data.decode(encoding) 

619 

620 try: 

621 return data.decode(DEFAULT_ENCODING) 

622 except UnicodeDecodeError: 

623 locale_encoding = locale.getpreferredencoding(False) or sys.getdefaultencoding() 

624 logging.warning( 

625 "unable to decode data from %s with default encoding %s, " 

626 "falling back to encoding from locale: %s. " 

627 "If this is intentional you should specify the encoding with a " 

628 "PEP-263 style comment, e.g. '# -*- coding: %s -*-'", 

629 url, 

630 DEFAULT_ENCODING, 

631 locale_encoding, 

632 locale_encoding, 

633 ) 

634 return data.decode(locale_encoding)