Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/req/req_file.py: 82%

234 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-26 06:33 +0000

1""" 

2Requirements file parsing 

3""" 

4 

5import logging 

6import optparse 

7import os 

8import re 

9import shlex 

10import urllib.parse 

11from optparse import Values 

12from typing import ( 

13 TYPE_CHECKING, 

14 Any, 

15 Callable, 

16 Dict, 

17 Generator, 

18 Iterable, 

19 List, 

20 NoReturn, 

21 Optional, 

22 Tuple, 

23) 

24 

25from pip._internal.cli import cmdoptions 

26from pip._internal.exceptions import InstallationError, RequirementsFileParseError 

27from pip._internal.models.search_scope import SearchScope 

28from pip._internal.network.session import PipSession 

29from pip._internal.network.utils import raise_for_status 

30from pip._internal.utils.encoding import auto_decode 

31from pip._internal.utils.urls import get_url_scheme 

32 

33if TYPE_CHECKING: 

34 from pip._internal.index.package_finder import PackageFinder 

35 

36__all__ = ["parse_requirements"] 

37 

38ReqFileLines = Iterable[Tuple[int, str]] 

39 

40LineParser = Callable[[str], Tuple[str, Values]] 

41 

42SCHEME_RE = re.compile(r"^(http|https|file):", re.I) 

43COMMENT_RE = re.compile(r"(^|\s+)#.*$") 

44 

45# Matches environment variable-style values in '${MY_VARIABLE_1}' with the 

46# variable name consisting of only uppercase letters, digits or the '_' 

47# (underscore). This follows the POSIX standard defined in IEEE Std 1003.1, 

48# 2013 Edition. 

49ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})") 

50 

51SUPPORTED_OPTIONS: List[Callable[..., optparse.Option]] = [ 

52 cmdoptions.index_url, 

53 cmdoptions.extra_index_url, 

54 cmdoptions.no_index, 

55 cmdoptions.constraints, 

56 cmdoptions.requirements, 

57 cmdoptions.editable, 

58 cmdoptions.find_links, 

59 cmdoptions.no_binary, 

60 cmdoptions.only_binary, 

61 cmdoptions.prefer_binary, 

62 cmdoptions.require_hashes, 

63 cmdoptions.pre, 

64 cmdoptions.trusted_host, 

65 cmdoptions.use_new_feature, 

66] 

67 

68# options to be passed to requirements 

69SUPPORTED_OPTIONS_REQ: List[Callable[..., optparse.Option]] = [ 

70 cmdoptions.global_options, 

71 cmdoptions.hash, 

72 cmdoptions.config_settings, 

73] 

74 

75SUPPORTED_OPTIONS_EDITABLE_REQ: List[Callable[..., optparse.Option]] = [ 

76 cmdoptions.config_settings, 

77] 

78 

79 

80# the 'dest' string values 

81SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ] 

82SUPPORTED_OPTIONS_EDITABLE_REQ_DEST = [ 

83 str(o().dest) for o in SUPPORTED_OPTIONS_EDITABLE_REQ 

84] 

85 

86logger = logging.getLogger(__name__) 

87 

88 

89class ParsedRequirement: 

90 def __init__( 

91 self, 

92 requirement: str, 

93 is_editable: bool, 

94 comes_from: str, 

95 constraint: bool, 

96 options: Optional[Dict[str, Any]] = None, 

97 line_source: Optional[str] = None, 

98 ) -> None: 

99 self.requirement = requirement 

100 self.is_editable = is_editable 

101 self.comes_from = comes_from 

102 self.options = options 

103 self.constraint = constraint 

104 self.line_source = line_source 

105 

106 

107class ParsedLine: 

108 def __init__( 

109 self, 

110 filename: str, 

111 lineno: int, 

112 args: str, 

113 opts: Values, 

114 constraint: bool, 

115 ) -> None: 

116 self.filename = filename 

117 self.lineno = lineno 

118 self.opts = opts 

119 self.constraint = constraint 

120 

121 if args: 

122 self.is_requirement = True 

123 self.is_editable = False 

124 self.requirement = args 

125 elif opts.editables: 

126 self.is_requirement = True 

127 self.is_editable = True 

128 # We don't support multiple -e on one line 

129 self.requirement = opts.editables[0] 

130 else: 

131 self.is_requirement = False 

132 

133 

134def parse_requirements( 

135 filename: str, 

136 session: PipSession, 

137 finder: Optional["PackageFinder"] = None, 

138 options: Optional[optparse.Values] = None, 

139 constraint: bool = False, 

140) -> Generator[ParsedRequirement, None, None]: 

141 """Parse a requirements file and yield ParsedRequirement instances. 

142 

143 :param filename: Path or url of requirements file. 

144 :param session: PipSession instance. 

145 :param finder: Instance of pip.index.PackageFinder. 

146 :param options: cli options. 

147 :param constraint: If true, parsing a constraint file rather than 

148 requirements file. 

149 """ 

150 line_parser = get_line_parser(finder) 

151 parser = RequirementsFileParser(session, line_parser) 

152 

153 for parsed_line in parser.parse(filename, constraint): 

154 parsed_req = handle_line( 

155 parsed_line, options=options, finder=finder, session=session 

156 ) 

157 if parsed_req is not None: 

158 yield parsed_req 

159 

160 

161def preprocess(content: str) -> ReqFileLines: 

162 """Split, filter, and join lines, and return a line iterator 

163 

164 :param content: the content of the requirements file 

165 """ 

166 lines_enum: ReqFileLines = enumerate(content.splitlines(), start=1) 

167 lines_enum = join_lines(lines_enum) 

168 lines_enum = ignore_comments(lines_enum) 

169 lines_enum = expand_env_variables(lines_enum) 

170 return lines_enum 

171 

172 

173def handle_requirement_line( 

174 line: ParsedLine, 

175 options: Optional[optparse.Values] = None, 

176) -> ParsedRequirement: 

177 # preserve for the nested code path 

178 line_comes_from = "{} {} (line {})".format( 

179 "-c" if line.constraint else "-r", 

180 line.filename, 

181 line.lineno, 

182 ) 

183 

184 assert line.is_requirement 

185 

186 # get the options that apply to requirements 

187 if line.is_editable: 

188 supported_dest = SUPPORTED_OPTIONS_EDITABLE_REQ_DEST 

189 else: 

190 supported_dest = SUPPORTED_OPTIONS_REQ_DEST 

191 req_options = {} 

192 for dest in supported_dest: 

193 if dest in line.opts.__dict__ and line.opts.__dict__[dest]: 

194 req_options[dest] = line.opts.__dict__[dest] 

195 

196 line_source = f"line {line.lineno} of {line.filename}" 

197 return ParsedRequirement( 

198 requirement=line.requirement, 

199 is_editable=line.is_editable, 

200 comes_from=line_comes_from, 

201 constraint=line.constraint, 

202 options=req_options, 

203 line_source=line_source, 

204 ) 

205 

206 

207def handle_option_line( 

208 opts: Values, 

209 filename: str, 

210 lineno: int, 

211 finder: Optional["PackageFinder"] = None, 

212 options: Optional[optparse.Values] = None, 

213 session: Optional[PipSession] = None, 

214) -> None: 

215 if opts.hashes: 

216 logger.warning( 

217 "%s line %s has --hash but no requirement, and will be ignored.", 

218 filename, 

219 lineno, 

220 ) 

221 

222 if options: 

223 # percolate options upward 

224 if opts.require_hashes: 

225 options.require_hashes = opts.require_hashes 

226 if opts.features_enabled: 

227 options.features_enabled.extend( 

228 f for f in opts.features_enabled if f not in options.features_enabled 

229 ) 

230 

231 # set finder options 

232 if finder: 

233 find_links = finder.find_links 

234 index_urls = finder.index_urls 

235 no_index = finder.search_scope.no_index 

236 if opts.no_index is True: 

237 no_index = True 

238 index_urls = [] 

239 if opts.index_url and not no_index: 

240 index_urls = [opts.index_url] 

241 if opts.extra_index_urls and not no_index: 

242 index_urls.extend(opts.extra_index_urls) 

243 if opts.find_links: 

244 # FIXME: it would be nice to keep track of the source 

245 # of the find_links: support a find-links local path 

246 # relative to a requirements file. 

247 value = opts.find_links[0] 

248 req_dir = os.path.dirname(os.path.abspath(filename)) 

249 relative_to_reqs_file = os.path.join(req_dir, value) 

250 if os.path.exists(relative_to_reqs_file): 

251 value = relative_to_reqs_file 

252 find_links.append(value) 

253 

254 if session: 

255 # We need to update the auth urls in session 

256 session.update_index_urls(index_urls) 

257 

258 search_scope = SearchScope( 

259 find_links=find_links, 

260 index_urls=index_urls, 

261 no_index=no_index, 

262 ) 

263 finder.search_scope = search_scope 

264 

265 if opts.pre: 

266 finder.set_allow_all_prereleases() 

267 

268 if opts.prefer_binary: 

269 finder.set_prefer_binary() 

270 

271 if session: 

272 for host in opts.trusted_hosts or []: 

273 source = f"line {lineno} of {filename}" 

274 session.add_trusted_host(host, source=source) 

275 

276 

277def handle_line( 

278 line: ParsedLine, 

279 options: Optional[optparse.Values] = None, 

280 finder: Optional["PackageFinder"] = None, 

281 session: Optional[PipSession] = None, 

282) -> Optional[ParsedRequirement]: 

283 """Handle a single parsed requirements line; This can result in 

284 creating/yielding requirements, or updating the finder. 

285 

286 :param line: The parsed line to be processed. 

287 :param options: CLI options. 

288 :param finder: The finder - updated by non-requirement lines. 

289 :param session: The session - updated by non-requirement lines. 

290 

291 Returns a ParsedRequirement object if the line is a requirement line, 

292 otherwise returns None. 

293 

294 For lines that contain requirements, the only options that have an effect 

295 are from SUPPORTED_OPTIONS_REQ, and they are scoped to the 

296 requirement. Other options from SUPPORTED_OPTIONS may be present, but are 

297 ignored. 

298 

299 For lines that do not contain requirements, the only options that have an 

300 effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may 

301 be present, but are ignored. These lines may contain multiple options 

302 (although our docs imply only one is supported), and all our parsed and 

303 affect the finder. 

304 """ 

305 

306 if line.is_requirement: 

307 parsed_req = handle_requirement_line(line, options) 

308 return parsed_req 

309 else: 

310 handle_option_line( 

311 line.opts, 

312 line.filename, 

313 line.lineno, 

314 finder, 

315 options, 

316 session, 

317 ) 

318 return None 

319 

320 

321class RequirementsFileParser: 

322 def __init__( 

323 self, 

324 session: PipSession, 

325 line_parser: LineParser, 

326 ) -> None: 

327 self._session = session 

328 self._line_parser = line_parser 

329 

330 def parse( 

331 self, filename: str, constraint: bool 

332 ) -> Generator[ParsedLine, None, None]: 

333 """Parse a given file, yielding parsed lines.""" 

334 yield from self._parse_and_recurse(filename, constraint) 

335 

336 def _parse_and_recurse( 

337 self, filename: str, constraint: bool 

338 ) -> Generator[ParsedLine, None, None]: 

339 for line in self._parse_file(filename, constraint): 

340 if not line.is_requirement and ( 

341 line.opts.requirements or line.opts.constraints 

342 ): 

343 # parse a nested requirements file 

344 if line.opts.requirements: 

345 req_path = line.opts.requirements[0] 

346 nested_constraint = False 

347 else: 

348 req_path = line.opts.constraints[0] 

349 nested_constraint = True 

350 

351 # original file is over http 

352 if SCHEME_RE.search(filename): 

353 # do a url join so relative paths work 

354 req_path = urllib.parse.urljoin(filename, req_path) 

355 # original file and nested file are paths 

356 elif not SCHEME_RE.search(req_path): 

357 # do a join so relative paths work 

358 req_path = os.path.join( 

359 os.path.dirname(filename), 

360 req_path, 

361 ) 

362 

363 yield from self._parse_and_recurse(req_path, nested_constraint) 

364 else: 

365 yield line 

366 

367 def _parse_file( 

368 self, filename: str, constraint: bool 

369 ) -> Generator[ParsedLine, None, None]: 

370 _, content = get_file_content(filename, self._session) 

371 

372 lines_enum = preprocess(content) 

373 

374 for line_number, line in lines_enum: 

375 try: 

376 args_str, opts = self._line_parser(line) 

377 except OptionParsingError as e: 

378 # add offending line 

379 msg = f"Invalid requirement: {line}\n{e.msg}" 

380 raise RequirementsFileParseError(msg) 

381 

382 yield ParsedLine( 

383 filename, 

384 line_number, 

385 args_str, 

386 opts, 

387 constraint, 

388 ) 

389 

390 

391def get_line_parser(finder: Optional["PackageFinder"]) -> LineParser: 

392 def parse_line(line: str) -> Tuple[str, Values]: 

393 # Build new parser for each line since it accumulates appendable 

394 # options. 

395 parser = build_parser() 

396 defaults = parser.get_default_values() 

397 defaults.index_url = None 

398 if finder: 

399 defaults.format_control = finder.format_control 

400 

401 args_str, options_str = break_args_options(line) 

402 

403 try: 

404 options = shlex.split(options_str) 

405 except ValueError as e: 

406 raise OptionParsingError(f"Could not split options: {options_str}") from e 

407 

408 opts, _ = parser.parse_args(options, defaults) 

409 

410 return args_str, opts 

411 

412 return parse_line 

413 

414 

415def break_args_options(line: str) -> Tuple[str, str]: 

416 """Break up the line into an args and options string. We only want to shlex 

417 (and then optparse) the options, not the args. args can contain markers 

418 which are corrupted by shlex. 

419 """ 

420 tokens = line.split(" ") 

421 args = [] 

422 options = tokens[:] 

423 for token in tokens: 

424 if token.startswith("-") or token.startswith("--"): 

425 break 

426 else: 

427 args.append(token) 

428 options.pop(0) 

429 return " ".join(args), " ".join(options) 

430 

431 

432class OptionParsingError(Exception): 

433 def __init__(self, msg: str) -> None: 

434 self.msg = msg 

435 

436 

437def build_parser() -> optparse.OptionParser: 

438 """ 

439 Return a parser for parsing requirement lines 

440 """ 

441 parser = optparse.OptionParser(add_help_option=False) 

442 

443 option_factories = SUPPORTED_OPTIONS + SUPPORTED_OPTIONS_REQ 

444 for option_factory in option_factories: 

445 option = option_factory() 

446 parser.add_option(option) 

447 

448 # By default optparse sys.exits on parsing errors. We want to wrap 

449 # that in our own exception. 

450 def parser_exit(self: Any, msg: str) -> "NoReturn": 

451 raise OptionParsingError(msg) 

452 

453 # NOTE: mypy disallows assigning to a method 

454 # https://github.com/python/mypy/issues/2427 

455 parser.exit = parser_exit # type: ignore 

456 

457 return parser 

458 

459 

460def join_lines(lines_enum: ReqFileLines) -> ReqFileLines: 

461 """Joins a line ending in '\' with the previous line (except when following 

462 comments). The joined line takes on the index of the first line. 

463 """ 

464 primary_line_number = None 

465 new_line: List[str] = [] 

466 for line_number, line in lines_enum: 

467 if not line.endswith("\\") or COMMENT_RE.match(line): 

468 if COMMENT_RE.match(line): 

469 # this ensures comments are always matched later 

470 line = " " + line 

471 if new_line: 

472 new_line.append(line) 

473 assert primary_line_number is not None 

474 yield primary_line_number, "".join(new_line) 

475 new_line = [] 

476 else: 

477 yield line_number, line 

478 else: 

479 if not new_line: 

480 primary_line_number = line_number 

481 new_line.append(line.strip("\\")) 

482 

483 # last line contains \ 

484 if new_line: 

485 assert primary_line_number is not None 

486 yield primary_line_number, "".join(new_line) 

487 

488 # TODO: handle space after '\'. 

489 

490 

491def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines: 

492 """ 

493 Strips comments and filter empty lines. 

494 """ 

495 for line_number, line in lines_enum: 

496 line = COMMENT_RE.sub("", line) 

497 line = line.strip() 

498 if line: 

499 yield line_number, line 

500 

501 

502def expand_env_variables(lines_enum: ReqFileLines) -> ReqFileLines: 

503 """Replace all environment variables that can be retrieved via `os.getenv`. 

504 

505 The only allowed format for environment variables defined in the 

506 requirement file is `${MY_VARIABLE_1}` to ensure two things: 

507 

508 1. Strings that contain a `$` aren't accidentally (partially) expanded. 

509 2. Ensure consistency across platforms for requirement files. 

510 

511 These points are the result of a discussion on the `github pull 

512 request #3514 <https://github.com/pypa/pip/pull/3514>`_. 

513 

514 Valid characters in variable names follow the `POSIX standard 

515 <http://pubs.opengroup.org/onlinepubs/9699919799/>`_ and are limited 

516 to uppercase letter, digits and the `_` (underscore). 

517 """ 

518 for line_number, line in lines_enum: 

519 for env_var, var_name in ENV_VAR_RE.findall(line): 

520 value = os.getenv(var_name) 

521 if not value: 

522 continue 

523 

524 line = line.replace(env_var, value) 

525 

526 yield line_number, line 

527 

528 

529def get_file_content(url: str, session: PipSession) -> Tuple[str, str]: 

530 """Gets the content of a file; it may be a filename, file: URL, or 

531 http: URL. Returns (location, content). Content is unicode. 

532 Respects # -*- coding: declarations on the retrieved files. 

533 

534 :param url: File path or url. 

535 :param session: PipSession instance. 

536 """ 

537 scheme = get_url_scheme(url) 

538 

539 # Pip has special support for file:// URLs (LocalFSAdapter). 

540 if scheme in ["http", "https", "file"]: 

541 resp = session.get(url) 

542 raise_for_status(resp) 

543 return resp.url, resp.text 

544 

545 # Assume this is a bare path. 

546 try: 

547 with open(url, "rb") as f: 

548 content = auto_decode(f.read()) 

549 except OSError as exc: 

550 raise InstallationError(f"Could not open requirements file: {exc}") 

551 return url, content