Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/req/req_file.py: 81%
232 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
1"""
2Requirements file parsing
3"""
5import logging
6import optparse
7import os
8import re
9import shlex
10import urllib.parse
11from optparse import Values
12from typing import (
13 TYPE_CHECKING,
14 Any,
15 Callable,
16 Dict,
17 Generator,
18 Iterable,
19 List,
20 Optional,
21 Tuple,
22)
24from pip._internal.cli import cmdoptions
25from pip._internal.exceptions import InstallationError, RequirementsFileParseError
26from pip._internal.models.search_scope import SearchScope
27from pip._internal.network.session import PipSession
28from pip._internal.network.utils import raise_for_status
29from pip._internal.utils.encoding import auto_decode
30from pip._internal.utils.urls import get_url_scheme
32if TYPE_CHECKING:
33 # NoReturn introduced in 3.6.2; imported only for type checking to maintain
34 # pip compatibility with older patch versions of Python 3.6
35 from typing import NoReturn
37 from pip._internal.index.package_finder import PackageFinder
39__all__ = ["parse_requirements"]
41ReqFileLines = Iterable[Tuple[int, str]]
43LineParser = Callable[[str], Tuple[str, Values]]
45SCHEME_RE = re.compile(r"^(http|https|file):", re.I)
46COMMENT_RE = re.compile(r"(^|\s+)#.*$")
48# Matches environment variable-style values in '${MY_VARIABLE_1}' with the
49# variable name consisting of only uppercase letters, digits or the '_'
50# (underscore). This follows the POSIX standard defined in IEEE Std 1003.1,
51# 2013 Edition.
52ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})")
54SUPPORTED_OPTIONS: List[Callable[..., optparse.Option]] = [
55 cmdoptions.index_url,
56 cmdoptions.extra_index_url,
57 cmdoptions.no_index,
58 cmdoptions.constraints,
59 cmdoptions.requirements,
60 cmdoptions.editable,
61 cmdoptions.find_links,
62 cmdoptions.no_binary,
63 cmdoptions.only_binary,
64 cmdoptions.prefer_binary,
65 cmdoptions.require_hashes,
66 cmdoptions.pre,
67 cmdoptions.trusted_host,
68 cmdoptions.use_new_feature,
69]
71# options to be passed to requirements
72SUPPORTED_OPTIONS_REQ: List[Callable[..., optparse.Option]] = [
73 cmdoptions.global_options,
74 cmdoptions.hash,
75 cmdoptions.config_settings,
76]
78# the 'dest' string values
79SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ]
81logger = logging.getLogger(__name__)
84class ParsedRequirement:
85 def __init__(
86 self,
87 requirement: str,
88 is_editable: bool,
89 comes_from: str,
90 constraint: bool,
91 options: Optional[Dict[str, Any]] = None,
92 line_source: Optional[str] = None,
93 ) -> None:
94 self.requirement = requirement
95 self.is_editable = is_editable
96 self.comes_from = comes_from
97 self.options = options
98 self.constraint = constraint
99 self.line_source = line_source
102class ParsedLine:
103 def __init__(
104 self,
105 filename: str,
106 lineno: int,
107 args: str,
108 opts: Values,
109 constraint: bool,
110 ) -> None:
111 self.filename = filename
112 self.lineno = lineno
113 self.opts = opts
114 self.constraint = constraint
116 if args:
117 self.is_requirement = True
118 self.is_editable = False
119 self.requirement = args
120 elif opts.editables:
121 self.is_requirement = True
122 self.is_editable = True
123 # We don't support multiple -e on one line
124 self.requirement = opts.editables[0]
125 else:
126 self.is_requirement = False
129def parse_requirements(
130 filename: str,
131 session: PipSession,
132 finder: Optional["PackageFinder"] = None,
133 options: Optional[optparse.Values] = None,
134 constraint: bool = False,
135) -> Generator[ParsedRequirement, None, None]:
136 """Parse a requirements file and yield ParsedRequirement instances.
138 :param filename: Path or url of requirements file.
139 :param session: PipSession instance.
140 :param finder: Instance of pip.index.PackageFinder.
141 :param options: cli options.
142 :param constraint: If true, parsing a constraint file rather than
143 requirements file.
144 """
145 line_parser = get_line_parser(finder)
146 parser = RequirementsFileParser(session, line_parser)
148 for parsed_line in parser.parse(filename, constraint):
149 parsed_req = handle_line(
150 parsed_line, options=options, finder=finder, session=session
151 )
152 if parsed_req is not None:
153 yield parsed_req
156def preprocess(content: str) -> ReqFileLines:
157 """Split, filter, and join lines, and return a line iterator
159 :param content: the content of the requirements file
160 """
161 lines_enum: ReqFileLines = enumerate(content.splitlines(), start=1)
162 lines_enum = join_lines(lines_enum)
163 lines_enum = ignore_comments(lines_enum)
164 lines_enum = expand_env_variables(lines_enum)
165 return lines_enum
168def handle_requirement_line(
169 line: ParsedLine,
170 options: Optional[optparse.Values] = None,
171) -> ParsedRequirement:
172 # preserve for the nested code path
173 line_comes_from = "{} {} (line {})".format(
174 "-c" if line.constraint else "-r",
175 line.filename,
176 line.lineno,
177 )
179 assert line.is_requirement
181 if line.is_editable:
182 # For editable requirements, we don't support per-requirement
183 # options, so just return the parsed requirement.
184 return ParsedRequirement(
185 requirement=line.requirement,
186 is_editable=line.is_editable,
187 comes_from=line_comes_from,
188 constraint=line.constraint,
189 )
190 else:
191 # get the options that apply to requirements
192 req_options = {}
193 for dest in SUPPORTED_OPTIONS_REQ_DEST:
194 if dest in line.opts.__dict__ and line.opts.__dict__[dest]:
195 req_options[dest] = line.opts.__dict__[dest]
197 line_source = f"line {line.lineno} of {line.filename}"
198 return ParsedRequirement(
199 requirement=line.requirement,
200 is_editable=line.is_editable,
201 comes_from=line_comes_from,
202 constraint=line.constraint,
203 options=req_options,
204 line_source=line_source,
205 )
208def handle_option_line(
209 opts: Values,
210 filename: str,
211 lineno: int,
212 finder: Optional["PackageFinder"] = None,
213 options: Optional[optparse.Values] = None,
214 session: Optional[PipSession] = None,
215) -> None:
216 if opts.hashes:
217 logger.warning(
218 "%s line %s has --hash but no requirement, and will be ignored.",
219 filename,
220 lineno,
221 )
223 if options:
224 # percolate options upward
225 if opts.require_hashes:
226 options.require_hashes = opts.require_hashes
227 if opts.features_enabled:
228 options.features_enabled.extend(
229 f for f in opts.features_enabled if f not in options.features_enabled
230 )
232 # set finder options
233 if finder:
234 find_links = finder.find_links
235 index_urls = finder.index_urls
236 no_index = finder.search_scope.no_index
237 if opts.no_index is True:
238 no_index = True
239 index_urls = []
240 if opts.index_url and not no_index:
241 index_urls = [opts.index_url]
242 if opts.extra_index_urls and not no_index:
243 index_urls.extend(opts.extra_index_urls)
244 if opts.find_links:
245 # FIXME: it would be nice to keep track of the source
246 # of the find_links: support a find-links local path
247 # relative to a requirements file.
248 value = opts.find_links[0]
249 req_dir = os.path.dirname(os.path.abspath(filename))
250 relative_to_reqs_file = os.path.join(req_dir, value)
251 if os.path.exists(relative_to_reqs_file):
252 value = relative_to_reqs_file
253 find_links.append(value)
255 if session:
256 # We need to update the auth urls in session
257 session.update_index_urls(index_urls)
259 search_scope = SearchScope(
260 find_links=find_links,
261 index_urls=index_urls,
262 no_index=no_index,
263 )
264 finder.search_scope = search_scope
266 if opts.pre:
267 finder.set_allow_all_prereleases()
269 if opts.prefer_binary:
270 finder.set_prefer_binary()
272 if session:
273 for host in opts.trusted_hosts or []:
274 source = f"line {lineno} of {filename}"
275 session.add_trusted_host(host, source=source)
278def handle_line(
279 line: ParsedLine,
280 options: Optional[optparse.Values] = None,
281 finder: Optional["PackageFinder"] = None,
282 session: Optional[PipSession] = None,
283) -> Optional[ParsedRequirement]:
284 """Handle a single parsed requirements line; This can result in
285 creating/yielding requirements, or updating the finder.
287 :param line: The parsed line to be processed.
288 :param options: CLI options.
289 :param finder: The finder - updated by non-requirement lines.
290 :param session: The session - updated by non-requirement lines.
292 Returns a ParsedRequirement object if the line is a requirement line,
293 otherwise returns None.
295 For lines that contain requirements, the only options that have an effect
296 are from SUPPORTED_OPTIONS_REQ, and they are scoped to the
297 requirement. Other options from SUPPORTED_OPTIONS may be present, but are
298 ignored.
300 For lines that do not contain requirements, the only options that have an
301 effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may
302 be present, but are ignored. These lines may contain multiple options
303 (although our docs imply only one is supported), and all our parsed and
304 affect the finder.
305 """
307 if line.is_requirement:
308 parsed_req = handle_requirement_line(line, options)
309 return parsed_req
310 else:
311 handle_option_line(
312 line.opts,
313 line.filename,
314 line.lineno,
315 finder,
316 options,
317 session,
318 )
319 return None
322class RequirementsFileParser:
323 def __init__(
324 self,
325 session: PipSession,
326 line_parser: LineParser,
327 ) -> None:
328 self._session = session
329 self._line_parser = line_parser
331 def parse(
332 self, filename: str, constraint: bool
333 ) -> Generator[ParsedLine, None, None]:
334 """Parse a given file, yielding parsed lines."""
335 yield from self._parse_and_recurse(filename, constraint)
337 def _parse_and_recurse(
338 self, filename: str, constraint: bool
339 ) -> Generator[ParsedLine, None, None]:
340 for line in self._parse_file(filename, constraint):
341 if not line.is_requirement and (
342 line.opts.requirements or line.opts.constraints
343 ):
344 # parse a nested requirements file
345 if line.opts.requirements:
346 req_path = line.opts.requirements[0]
347 nested_constraint = False
348 else:
349 req_path = line.opts.constraints[0]
350 nested_constraint = True
352 # original file is over http
353 if SCHEME_RE.search(filename):
354 # do a url join so relative paths work
355 req_path = urllib.parse.urljoin(filename, req_path)
356 # original file and nested file are paths
357 elif not SCHEME_RE.search(req_path):
358 # do a join so relative paths work
359 req_path = os.path.join(
360 os.path.dirname(filename),
361 req_path,
362 )
364 yield from self._parse_and_recurse(req_path, nested_constraint)
365 else:
366 yield line
368 def _parse_file(
369 self, filename: str, constraint: bool
370 ) -> Generator[ParsedLine, None, None]:
371 _, content = get_file_content(filename, self._session)
373 lines_enum = preprocess(content)
375 for line_number, line in lines_enum:
376 try:
377 args_str, opts = self._line_parser(line)
378 except OptionParsingError as e:
379 # add offending line
380 msg = f"Invalid requirement: {line}\n{e.msg}"
381 raise RequirementsFileParseError(msg)
383 yield ParsedLine(
384 filename,
385 line_number,
386 args_str,
387 opts,
388 constraint,
389 )
392def get_line_parser(finder: Optional["PackageFinder"]) -> LineParser:
393 def parse_line(line: str) -> Tuple[str, Values]:
394 # Build new parser for each line since it accumulates appendable
395 # options.
396 parser = build_parser()
397 defaults = parser.get_default_values()
398 defaults.index_url = None
399 if finder:
400 defaults.format_control = finder.format_control
402 args_str, options_str = break_args_options(line)
404 try:
405 options = shlex.split(options_str)
406 except ValueError as e:
407 raise OptionParsingError(f"Could not split options: {options_str}") from e
409 opts, _ = parser.parse_args(options, defaults)
411 return args_str, opts
413 return parse_line
416def break_args_options(line: str) -> Tuple[str, str]:
417 """Break up the line into an args and options string. We only want to shlex
418 (and then optparse) the options, not the args. args can contain markers
419 which are corrupted by shlex.
420 """
421 tokens = line.split(" ")
422 args = []
423 options = tokens[:]
424 for token in tokens:
425 if token.startswith("-") or token.startswith("--"):
426 break
427 else:
428 args.append(token)
429 options.pop(0)
430 return " ".join(args), " ".join(options)
433class OptionParsingError(Exception):
434 def __init__(self, msg: str) -> None:
435 self.msg = msg
438def build_parser() -> optparse.OptionParser:
439 """
440 Return a parser for parsing requirement lines
441 """
442 parser = optparse.OptionParser(add_help_option=False)
444 option_factories = SUPPORTED_OPTIONS + SUPPORTED_OPTIONS_REQ
445 for option_factory in option_factories:
446 option = option_factory()
447 parser.add_option(option)
449 # By default optparse sys.exits on parsing errors. We want to wrap
450 # that in our own exception.
451 def parser_exit(self: Any, msg: str) -> "NoReturn":
452 raise OptionParsingError(msg)
454 # NOTE: mypy disallows assigning to a method
455 # https://github.com/python/mypy/issues/2427
456 parser.exit = parser_exit # type: ignore
458 return parser
461def join_lines(lines_enum: ReqFileLines) -> ReqFileLines:
462 """Joins a line ending in '\' with the previous line (except when following
463 comments). The joined line takes on the index of the first line.
464 """
465 primary_line_number = None
466 new_line: List[str] = []
467 for line_number, line in lines_enum:
468 if not line.endswith("\\") or COMMENT_RE.match(line):
469 if COMMENT_RE.match(line):
470 # this ensures comments are always matched later
471 line = " " + line
472 if new_line:
473 new_line.append(line)
474 assert primary_line_number is not None
475 yield primary_line_number, "".join(new_line)
476 new_line = []
477 else:
478 yield line_number, line
479 else:
480 if not new_line:
481 primary_line_number = line_number
482 new_line.append(line.strip("\\"))
484 # last line contains \
485 if new_line:
486 assert primary_line_number is not None
487 yield primary_line_number, "".join(new_line)
489 # TODO: handle space after '\'.
492def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines:
493 """
494 Strips comments and filter empty lines.
495 """
496 for line_number, line in lines_enum:
497 line = COMMENT_RE.sub("", line)
498 line = line.strip()
499 if line:
500 yield line_number, line
503def expand_env_variables(lines_enum: ReqFileLines) -> ReqFileLines:
504 """Replace all environment variables that can be retrieved via `os.getenv`.
506 The only allowed format for environment variables defined in the
507 requirement file is `${MY_VARIABLE_1}` to ensure two things:
509 1. Strings that contain a `$` aren't accidentally (partially) expanded.
510 2. Ensure consistency across platforms for requirement files.
512 These points are the result of a discussion on the `github pull
513 request #3514 <https://github.com/pypa/pip/pull/3514>`_.
515 Valid characters in variable names follow the `POSIX standard
516 <http://pubs.opengroup.org/onlinepubs/9699919799/>`_ and are limited
517 to uppercase letter, digits and the `_` (underscore).
518 """
519 for line_number, line in lines_enum:
520 for env_var, var_name in ENV_VAR_RE.findall(line):
521 value = os.getenv(var_name)
522 if not value:
523 continue
525 line = line.replace(env_var, value)
527 yield line_number, line
530def get_file_content(url: str, session: PipSession) -> Tuple[str, str]:
531 """Gets the content of a file; it may be a filename, file: URL, or
532 http: URL. Returns (location, content). Content is unicode.
533 Respects # -*- coding: declarations on the retrieved files.
535 :param url: File path or url.
536 :param session: PipSession instance.
537 """
538 scheme = get_url_scheme(url)
540 # Pip has special support for file:// URLs (LocalFSAdapter).
541 if scheme in ["http", "https", "file"]:
542 resp = session.get(url)
543 raise_for_status(resp)
544 return resp.url, resp.text
546 # Assume this is a bare path.
547 try:
548 with open(url, "rb") as f:
549 content = auto_decode(f.read())
550 except OSError as exc:
551 raise InstallationError(f"Could not open requirements file: {exc}")
552 return url, content