Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/req/req_file.py: 82%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

276 statements  

1""" 

2Requirements file parsing 

3""" 

4 

5from __future__ import annotations 

6 

7import codecs 

8import locale 

9import logging 

10import optparse 

11import os 

12import re 

13import shlex 

14import sys 

15import urllib.parse 

16from collections.abc import Callable, Generator, Iterable 

17from dataclasses import dataclass 

18from optparse import Values 

19from typing import ( 

20 TYPE_CHECKING, 

21 Any, 

22 NoReturn, 

23) 

24 

25from pip._internal.cli import cmdoptions 

26from pip._internal.exceptions import InstallationError, RequirementsFileParseError 

27from pip._internal.models.release_control import ReleaseControl 

28from pip._internal.models.search_scope import SearchScope 

29 

30if TYPE_CHECKING: 

31 from pip._internal.index.package_finder import PackageFinder 

32 from pip._internal.network.session import PipSession 

33 

34__all__ = ["parse_requirements"] 

35 

36ReqFileLines = Iterable[tuple[int, str]] 

37 

38LineParser = Callable[[str], tuple[str, Values]] 

39 

40SCHEME_RE = re.compile(r"^(http|https|file):", re.I) 

41COMMENT_RE = re.compile(r"(^|\s+)#.*$") 

42 

43# Matches environment variable-style values in '${MY_VARIABLE_1}' with the 

44# variable name consisting of only uppercase letters, digits or the '_' 

45# (underscore). This follows the POSIX standard defined in IEEE Std 1003.1, 

46# 2013 Edition. 

47ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})") 

48 

49SUPPORTED_OPTIONS: list[Callable[..., optparse.Option]] = [ 

50 cmdoptions.index_url, 

51 cmdoptions.extra_index_url, 

52 cmdoptions.no_index, 

53 cmdoptions.constraints, 

54 cmdoptions.requirements, 

55 cmdoptions.editable, 

56 cmdoptions.find_links, 

57 cmdoptions.no_binary, 

58 cmdoptions.only_binary, 

59 cmdoptions.prefer_binary, 

60 cmdoptions.require_hashes, 

61 cmdoptions.pre, 

62 cmdoptions.all_releases, 

63 cmdoptions.only_final, 

64 cmdoptions.trusted_host, 

65 cmdoptions.use_new_feature, 

66] 

67 

68# options to be passed to requirements 

69SUPPORTED_OPTIONS_REQ: list[Callable[..., optparse.Option]] = [ 

70 cmdoptions.hash, 

71 cmdoptions.config_settings, 

72] 

73 

74SUPPORTED_OPTIONS_EDITABLE_REQ: list[Callable[..., optparse.Option]] = [ 

75 cmdoptions.config_settings, 

76] 

77 

78 

79# the 'dest' string values 

80SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ] 

81SUPPORTED_OPTIONS_EDITABLE_REQ_DEST = [ 

82 str(o().dest) for o in SUPPORTED_OPTIONS_EDITABLE_REQ 

83] 

84 

85# order of BOMS is important: codecs.BOM_UTF16_LE is a prefix of codecs.BOM_UTF32_LE 

86# so data.startswith(BOM_UTF16_LE) would be true for UTF32_LE data 

87BOMS: list[tuple[bytes, str]] = [ 

88 (codecs.BOM_UTF8, "utf-8"), 

89 (codecs.BOM_UTF32, "utf-32"), 

90 (codecs.BOM_UTF32_BE, "utf-32-be"), 

91 (codecs.BOM_UTF32_LE, "utf-32-le"), 

92 (codecs.BOM_UTF16, "utf-16"), 

93 (codecs.BOM_UTF16_BE, "utf-16-be"), 

94 (codecs.BOM_UTF16_LE, "utf-16-le"), 

95] 

96 

97PEP263_ENCODING_RE = re.compile(rb"coding[:=]\s*([-\w.]+)") 

98DEFAULT_ENCODING = "utf-8" 

99 

100logger = logging.getLogger(__name__) 

101 

102 

103@dataclass(frozen=True, slots=True) 

104class ParsedRequirement: 

105 requirement: str 

106 is_editable: bool 

107 comes_from: str 

108 constraint: bool 

109 options: dict[str, Any] | None 

110 line_source: str | None 

111 

112 

113@dataclass(frozen=True, slots=True) 

114class ParsedLine: 

115 filename: str 

116 lineno: int 

117 args: str 

118 opts: Values 

119 constraint: bool 

120 

121 @property 

122 def is_editable(self) -> bool: 

123 return bool(self.opts.editables) 

124 

125 @property 

126 def requirement(self) -> str | None: 

127 if self.args: 

128 return self.args 

129 elif self.is_editable: 

130 # We don't support multiple -e on one line 

131 return self.opts.editables[0] 

132 return None 

133 

134 

135def parse_requirements( 

136 filename: str, 

137 session: PipSession, 

138 finder: PackageFinder | None = None, 

139 options: optparse.Values | None = None, 

140 constraint: bool = False, 

141) -> Generator[ParsedRequirement, None, None]: 

142 """Parse a requirements file and yield ParsedRequirement instances. 

143 

144 :param filename: Path or url of requirements file. 

145 :param session: PipSession instance. 

146 :param finder: Instance of pip.index.PackageFinder. 

147 :param options: cli options. 

148 :param constraint: If true, parsing a constraint file rather than 

149 requirements file. 

150 """ 

151 line_parser = get_line_parser(finder) 

152 parser = RequirementsFileParser(session, line_parser) 

153 

154 for parsed_line in parser.parse(filename, constraint): 

155 parsed_req = handle_line( 

156 parsed_line, options=options, finder=finder, session=session 

157 ) 

158 if parsed_req is not None: 

159 yield parsed_req 

160 

161 

162def preprocess(content: str) -> ReqFileLines: 

163 """Split, filter, and join lines, and return a line iterator 

164 

165 :param content: the content of the requirements file 

166 """ 

167 lines_enum: ReqFileLines = enumerate(content.splitlines(), start=1) 

168 lines_enum = join_lines(lines_enum) 

169 lines_enum = ignore_comments(lines_enum) 

170 lines_enum = expand_env_variables(lines_enum) 

171 return lines_enum 

172 

173 

174def handle_requirement_line( 

175 line: ParsedLine, 

176 options: optparse.Values | None = None, 

177) -> ParsedRequirement: 

178 # preserve for the nested code path 

179 line_comes_from = "{} {} (line {})".format( 

180 "-c" if line.constraint else "-r", 

181 line.filename, 

182 line.lineno, 

183 ) 

184 

185 assert line.requirement is not None 

186 

187 # get the options that apply to requirements 

188 if line.is_editable: 

189 supported_dest = SUPPORTED_OPTIONS_EDITABLE_REQ_DEST 

190 else: 

191 supported_dest = SUPPORTED_OPTIONS_REQ_DEST 

192 req_options = {} 

193 for dest in supported_dest: 

194 if dest in line.opts.__dict__ and line.opts.__dict__[dest]: 

195 req_options[dest] = line.opts.__dict__[dest] 

196 

197 line_source = f"line {line.lineno} of {line.filename}" 

198 return ParsedRequirement( 

199 requirement=line.requirement, 

200 is_editable=line.is_editable, 

201 comes_from=line_comes_from, 

202 constraint=line.constraint, 

203 options=req_options, 

204 line_source=line_source, 

205 ) 

206 

207 

208def handle_option_line( 

209 opts: Values, 

210 filename: str, 

211 lineno: int, 

212 finder: PackageFinder | None = None, 

213 options: optparse.Values | None = None, 

214 session: PipSession | None = None, 

215) -> None: 

216 if opts.hashes: 

217 logger.warning( 

218 "%s line %s has --hash but no requirement, and will be ignored.", 

219 filename, 

220 lineno, 

221 ) 

222 

223 if options: 

224 # percolate options upward 

225 if opts.require_hashes: 

226 options.require_hashes = opts.require_hashes 

227 if opts.features_enabled: 

228 options.features_enabled.extend( 

229 f for f in opts.features_enabled if f not in options.features_enabled 

230 ) 

231 

232 # set finder options 

233 if finder: 

234 find_links = finder.find_links 

235 index_urls = finder.index_urls 

236 no_index = finder.search_scope.no_index 

237 if opts.no_index is True: 

238 no_index = True 

239 index_urls = [] 

240 if opts.index_url and not no_index: 

241 index_urls = [opts.index_url] 

242 if opts.extra_index_urls and not no_index: 

243 index_urls.extend(opts.extra_index_urls) 

244 if opts.find_links: 

245 # FIXME: it would be nice to keep track of the source 

246 # of the find_links: support a find-links local path 

247 # relative to a requirements file. 

248 value = opts.find_links[0] 

249 req_dir = os.path.dirname(os.path.abspath(filename)) 

250 relative_to_reqs_file = os.path.join(req_dir, value) 

251 if os.path.exists(relative_to_reqs_file): 

252 value = relative_to_reqs_file 

253 find_links.append(value) 

254 

255 if session: 

256 # We need to update the auth urls in session 

257 session.update_index_urls(index_urls) 

258 

259 search_scope = SearchScope( 

260 find_links=find_links, 

261 index_urls=index_urls, 

262 no_index=no_index, 

263 ) 

264 finder.search_scope = search_scope 

265 

266 # Transform --pre into --all-releases :all: 

267 if opts.pre: 

268 if not opts.release_control: 

269 opts.release_control = ReleaseControl() 

270 opts.release_control.all_releases.add(":all:") 

271 

272 if opts.release_control: 

273 if not finder.release_control: 

274 # First time seeing release_control, set it on finder 

275 finder.set_release_control(opts.release_control) 

276 

277 if opts.prefer_binary: 

278 finder.set_prefer_binary() 

279 

280 if session: 

281 for host in opts.trusted_hosts or []: 

282 source = f"line {lineno} of {filename}" 

283 session.add_trusted_host(host, source=source) 

284 

285 

286def handle_line( 

287 line: ParsedLine, 

288 options: optparse.Values | None = None, 

289 finder: PackageFinder | None = None, 

290 session: PipSession | None = None, 

291) -> ParsedRequirement | None: 

292 """Handle a single parsed requirements line; This can result in 

293 creating/yielding requirements, or updating the finder. 

294 

295 :param line: The parsed line to be processed. 

296 :param options: CLI options. 

297 :param finder: The finder - updated by non-requirement lines. 

298 :param session: The session - updated by non-requirement lines. 

299 

300 Returns a ParsedRequirement object if the line is a requirement line, 

301 otherwise returns None. 

302 

303 For lines that contain requirements, the only options that have an effect 

304 are from SUPPORTED_OPTIONS_REQ, and they are scoped to the 

305 requirement. Other options from SUPPORTED_OPTIONS may be present, but are 

306 ignored. 

307 

308 For lines that do not contain requirements, the only options that have an 

309 effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may 

310 be present, but are ignored. These lines may contain multiple options 

311 (although our docs imply only one is supported), and all our parsed and 

312 affect the finder. 

313 """ 

314 

315 if line.requirement is not None: 

316 parsed_req = handle_requirement_line(line, options) 

317 return parsed_req 

318 else: 

319 handle_option_line( 

320 line.opts, 

321 line.filename, 

322 line.lineno, 

323 finder, 

324 options, 

325 session, 

326 ) 

327 return None 

328 

329 

330class RequirementsFileParser: 

331 def __init__( 

332 self, 

333 session: PipSession, 

334 line_parser: LineParser, 

335 ) -> None: 

336 self._session = session 

337 self._line_parser = line_parser 

338 

339 def parse( 

340 self, filename: str, constraint: bool 

341 ) -> Generator[ParsedLine, None, None]: 

342 """Parse a given file, yielding parsed lines.""" 

343 yield from self._parse_and_recurse( 

344 filename, constraint, [{os.path.abspath(filename): None}] 

345 ) 

346 

347 def _parse_and_recurse( 

348 self, 

349 filename: str, 

350 constraint: bool, 

351 parsed_files_stack: list[dict[str, str | None]], 

352 ) -> Generator[ParsedLine, None, None]: 

353 for line in self._parse_file(filename, constraint): 

354 if line.requirement is None and ( 

355 line.opts.requirements or line.opts.constraints 

356 ): 

357 # parse a nested requirements file 

358 if line.opts.requirements: 

359 req_path = line.opts.requirements[0] 

360 nested_constraint = False 

361 else: 

362 req_path = line.opts.constraints[0] 

363 nested_constraint = True 

364 

365 # original file is over http 

366 if SCHEME_RE.search(filename): 

367 # do a url join so relative paths work 

368 req_path = urllib.parse.urljoin(filename, req_path) 

369 # original file and nested file are paths 

370 elif not SCHEME_RE.search(req_path): 

371 # do a join so relative paths work 

372 # and then abspath so that we can identify recursive references 

373 req_path = os.path.abspath( 

374 os.path.join( 

375 os.path.dirname(filename), 

376 req_path, 

377 ) 

378 ) 

379 parsed_files = parsed_files_stack[0] 

380 if req_path in parsed_files: 

381 initial_file = parsed_files[req_path] 

382 tail = ( 

383 f" and again in {initial_file}" 

384 if initial_file is not None 

385 else "" 

386 ) 

387 raise RequirementsFileParseError( 

388 f"{req_path} recursively references itself in {filename}{tail}" 

389 ) 

390 # Keeping a track where was each file first included in 

391 new_parsed_files = parsed_files.copy() 

392 new_parsed_files[req_path] = filename 

393 yield from self._parse_and_recurse( 

394 req_path, nested_constraint, [new_parsed_files, *parsed_files_stack] 

395 ) 

396 else: 

397 yield line 

398 

399 def _parse_file( 

400 self, filename: str, constraint: bool 

401 ) -> Generator[ParsedLine, None, None]: 

402 _, content = get_file_content(filename, self._session, constraint=constraint) 

403 

404 lines_enum = preprocess(content) 

405 

406 for line_number, line in lines_enum: 

407 try: 

408 args_str, opts = self._line_parser(line) 

409 except OptionParsingError as e: 

410 # add offending line 

411 msg = f"Invalid requirement: {line}\n{e.msg}" 

412 raise RequirementsFileParseError(msg) 

413 

414 yield ParsedLine( 

415 filename, 

416 line_number, 

417 args_str, 

418 opts, 

419 constraint, 

420 ) 

421 

422 

423def get_line_parser(finder: PackageFinder | None) -> LineParser: 

424 def parse_line(line: str) -> tuple[str, Values]: 

425 # Build new parser for each line since it accumulates appendable 

426 # options. 

427 parser = build_parser() 

428 defaults = parser.get_default_values() 

429 defaults.index_url = None 

430 if finder: 

431 defaults.format_control = finder.format_control 

432 defaults.release_control = finder.release_control 

433 

434 args_str, options_str = break_args_options(line) 

435 

436 try: 

437 options = shlex.split(options_str) 

438 except ValueError as e: 

439 raise OptionParsingError(f"Could not split options: {options_str}") from e 

440 

441 opts, _ = parser.parse_args(options, defaults) 

442 

443 return args_str, opts 

444 

445 return parse_line 

446 

447 

448def break_args_options(line: str) -> tuple[str, str]: 

449 """Break up the line into an args and options string. We only want to shlex 

450 (and then optparse) the options, not the args. args can contain markers 

451 which are corrupted by shlex. 

452 """ 

453 tokens = line.split(" ") 

454 args = [] 

455 options = tokens[:] 

456 for token in tokens: 

457 if token.startswith(("-", "--")): 

458 break 

459 else: 

460 args.append(token) 

461 options.pop(0) 

462 return " ".join(args), " ".join(options) 

463 

464 

465class OptionParsingError(Exception): 

466 def __init__(self, msg: str) -> None: 

467 self.msg = msg 

468 

469 

470def build_parser() -> optparse.OptionParser: 

471 """ 

472 Return a parser for parsing requirement lines 

473 """ 

474 parser = optparse.OptionParser(add_help_option=False) 

475 

476 option_factories = SUPPORTED_OPTIONS + SUPPORTED_OPTIONS_REQ 

477 for option_factory in option_factories: 

478 option = option_factory() 

479 parser.add_option(option) 

480 

481 # By default optparse sys.exits on parsing errors. We want to wrap 

482 # that in our own exception. 

483 def parser_exit(self: Any, msg: str) -> NoReturn: 

484 raise OptionParsingError(msg) 

485 

486 # NOTE: mypy disallows assigning to a method 

487 # https://github.com/python/mypy/issues/2427 

488 parser.exit = parser_exit # type: ignore 

489 

490 return parser 

491 

492 

493def join_lines(lines_enum: ReqFileLines) -> ReqFileLines: 

494 """Joins a line ending in '\' with the previous line (except when following 

495 comments). The joined line takes on the index of the first line. 

496 """ 

497 primary_line_number = None 

498 new_line: list[str] = [] 

499 for line_number, line in lines_enum: 

500 if not line.endswith("\\") or COMMENT_RE.match(line): 

501 if COMMENT_RE.match(line): 

502 # this ensures comments are always matched later 

503 line = " " + line 

504 if new_line: 

505 new_line.append(line) 

506 assert primary_line_number is not None 

507 yield primary_line_number, "".join(new_line) 

508 new_line = [] 

509 else: 

510 yield line_number, line 

511 else: 

512 if not new_line: 

513 primary_line_number = line_number 

514 new_line.append(line.strip("\\")) 

515 

516 # last line contains \ 

517 if new_line: 

518 assert primary_line_number is not None 

519 yield primary_line_number, "".join(new_line) 

520 

521 # TODO: handle space after '\'. 

522 

523 

524def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines: 

525 """ 

526 Strips comments and filter empty lines. 

527 """ 

528 for line_number, line in lines_enum: 

529 line = COMMENT_RE.sub("", line) 

530 line = line.strip() 

531 if line: 

532 yield line_number, line 

533 

534 

535def expand_env_variables(lines_enum: ReqFileLines) -> ReqFileLines: 

536 """Replace all environment variables that can be retrieved via `os.getenv`. 

537 

538 The only allowed format for environment variables defined in the 

539 requirement file is `${MY_VARIABLE_1}` to ensure two things: 

540 

541 1. Strings that contain a `$` aren't accidentally (partially) expanded. 

542 2. Ensure consistency across platforms for requirement files. 

543 

544 These points are the result of a discussion on the `github pull 

545 request #3514 <https://github.com/pypa/pip/pull/3514>`_. 

546 

547 Valid characters in variable names follow the `POSIX standard 

548 <http://pubs.opengroup.org/onlinepubs/9699919799/>`_ and are limited 

549 to uppercase letter, digits and the `_` (underscore). 

550 """ 

551 for line_number, line in lines_enum: 

552 for env_var, var_name in ENV_VAR_RE.findall(line): 

553 value = os.getenv(var_name) 

554 if not value: 

555 continue 

556 

557 line = line.replace(env_var, value) 

558 

559 yield line_number, line 

560 

561 

562def get_file_content( 

563 url: str, session: PipSession, *, constraint: bool = False 

564) -> tuple[str, str]: 

565 """Gets the content of a file; it may be a filename, file: URL, or 

566 http: URL. Returns (location, content). Content is unicode. 

567 Respects # -*- coding: declarations on the retrieved files. 

568 

569 :param url: File path or url. 

570 :param session: PipSession instance. 

571 """ 

572 scheme = urllib.parse.urlsplit(url).scheme 

573 # Pip has special support for file:// URLs (LocalFSAdapter). 

574 if scheme in ["http", "https", "file"]: 

575 # Delay importing heavy network modules until absolutely necessary. 

576 from pip._internal.network.utils import raise_for_status 

577 

578 resp = session.get(url) 

579 raise_for_status(resp) 

580 return resp.url, resp.text 

581 

582 # Assume this is a bare path. 

583 try: 

584 with open(url, "rb") as f: 

585 raw_content = f.read() 

586 except OSError as exc: 

587 kind = "constraint" if constraint else "requirements" 

588 raise InstallationError(f"Could not open {kind} file: {exc}") 

589 

590 content = _decode_req_file(raw_content, url) 

591 

592 return url, content 

593 

594 

595def _decode_req_file(data: bytes, url: str) -> str: 

596 for bom, encoding in BOMS: 

597 if data.startswith(bom): 

598 return data[len(bom) :].decode(encoding) 

599 

600 for line in data.split(b"\n")[:2]: 

601 if line[0:1] == b"#": 

602 result = PEP263_ENCODING_RE.search(line) 

603 if result is not None: 

604 encoding = result.groups()[0].decode("ascii") 

605 return data.decode(encoding) 

606 

607 try: 

608 return data.decode(DEFAULT_ENCODING) 

609 except UnicodeDecodeError: 

610 locale_encoding = locale.getpreferredencoding(False) or sys.getdefaultencoding() 

611 logging.warning( 

612 "unable to decode data from %s with default encoding %s, " 

613 "falling back to encoding from locale: %s. " 

614 "If this is intentional you should specify the encoding with a " 

615 "PEP-263 style comment, e.g. '# -*- coding: %s -*-'", 

616 url, 

617 DEFAULT_ENCODING, 

618 locale_encoding, 

619 locale_encoding, 

620 ) 

621 return data.decode(locale_encoding)