Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/req/req_file.py: 82%
234 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
1"""
2Requirements file parsing
3"""
5import logging
6import optparse
7import os
8import re
9import shlex
10import urllib.parse
11from optparse import Values
12from typing import (
13 TYPE_CHECKING,
14 Any,
15 Callable,
16 Dict,
17 Generator,
18 Iterable,
19 List,
20 NoReturn,
21 Optional,
22 Tuple,
23)
25from pip._internal.cli import cmdoptions
26from pip._internal.exceptions import InstallationError, RequirementsFileParseError
27from pip._internal.models.search_scope import SearchScope
28from pip._internal.network.session import PipSession
29from pip._internal.network.utils import raise_for_status
30from pip._internal.utils.encoding import auto_decode
31from pip._internal.utils.urls import get_url_scheme
33if TYPE_CHECKING:
34 from pip._internal.index.package_finder import PackageFinder
36__all__ = ["parse_requirements"]
38ReqFileLines = Iterable[Tuple[int, str]]
40LineParser = Callable[[str], Tuple[str, Values]]
42SCHEME_RE = re.compile(r"^(http|https|file):", re.I)
43COMMENT_RE = re.compile(r"(^|\s+)#.*$")
45# Matches environment variable-style values in '${MY_VARIABLE_1}' with the
46# variable name consisting of only uppercase letters, digits or the '_'
47# (underscore). This follows the POSIX standard defined in IEEE Std 1003.1,
48# 2013 Edition.
49ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})")
51SUPPORTED_OPTIONS: List[Callable[..., optparse.Option]] = [
52 cmdoptions.index_url,
53 cmdoptions.extra_index_url,
54 cmdoptions.no_index,
55 cmdoptions.constraints,
56 cmdoptions.requirements,
57 cmdoptions.editable,
58 cmdoptions.find_links,
59 cmdoptions.no_binary,
60 cmdoptions.only_binary,
61 cmdoptions.prefer_binary,
62 cmdoptions.require_hashes,
63 cmdoptions.pre,
64 cmdoptions.trusted_host,
65 cmdoptions.use_new_feature,
66]
68# options to be passed to requirements
69SUPPORTED_OPTIONS_REQ: List[Callable[..., optparse.Option]] = [
70 cmdoptions.global_options,
71 cmdoptions.hash,
72 cmdoptions.config_settings,
73]
75SUPPORTED_OPTIONS_EDITABLE_REQ: List[Callable[..., optparse.Option]] = [
76 cmdoptions.config_settings,
77]
80# the 'dest' string values
81SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ]
82SUPPORTED_OPTIONS_EDITABLE_REQ_DEST = [
83 str(o().dest) for o in SUPPORTED_OPTIONS_EDITABLE_REQ
84]
86logger = logging.getLogger(__name__)
89class ParsedRequirement:
90 def __init__(
91 self,
92 requirement: str,
93 is_editable: bool,
94 comes_from: str,
95 constraint: bool,
96 options: Optional[Dict[str, Any]] = None,
97 line_source: Optional[str] = None,
98 ) -> None:
99 self.requirement = requirement
100 self.is_editable = is_editable
101 self.comes_from = comes_from
102 self.options = options
103 self.constraint = constraint
104 self.line_source = line_source
107class ParsedLine:
108 def __init__(
109 self,
110 filename: str,
111 lineno: int,
112 args: str,
113 opts: Values,
114 constraint: bool,
115 ) -> None:
116 self.filename = filename
117 self.lineno = lineno
118 self.opts = opts
119 self.constraint = constraint
121 if args:
122 self.is_requirement = True
123 self.is_editable = False
124 self.requirement = args
125 elif opts.editables:
126 self.is_requirement = True
127 self.is_editable = True
128 # We don't support multiple -e on one line
129 self.requirement = opts.editables[0]
130 else:
131 self.is_requirement = False
134def parse_requirements(
135 filename: str,
136 session: PipSession,
137 finder: Optional["PackageFinder"] = None,
138 options: Optional[optparse.Values] = None,
139 constraint: bool = False,
140) -> Generator[ParsedRequirement, None, None]:
141 """Parse a requirements file and yield ParsedRequirement instances.
143 :param filename: Path or url of requirements file.
144 :param session: PipSession instance.
145 :param finder: Instance of pip.index.PackageFinder.
146 :param options: cli options.
147 :param constraint: If true, parsing a constraint file rather than
148 requirements file.
149 """
150 line_parser = get_line_parser(finder)
151 parser = RequirementsFileParser(session, line_parser)
153 for parsed_line in parser.parse(filename, constraint):
154 parsed_req = handle_line(
155 parsed_line, options=options, finder=finder, session=session
156 )
157 if parsed_req is not None:
158 yield parsed_req
161def preprocess(content: str) -> ReqFileLines:
162 """Split, filter, and join lines, and return a line iterator
164 :param content: the content of the requirements file
165 """
166 lines_enum: ReqFileLines = enumerate(content.splitlines(), start=1)
167 lines_enum = join_lines(lines_enum)
168 lines_enum = ignore_comments(lines_enum)
169 lines_enum = expand_env_variables(lines_enum)
170 return lines_enum
173def handle_requirement_line(
174 line: ParsedLine,
175 options: Optional[optparse.Values] = None,
176) -> ParsedRequirement:
177 # preserve for the nested code path
178 line_comes_from = "{} {} (line {})".format(
179 "-c" if line.constraint else "-r",
180 line.filename,
181 line.lineno,
182 )
184 assert line.is_requirement
186 # get the options that apply to requirements
187 if line.is_editable:
188 supported_dest = SUPPORTED_OPTIONS_EDITABLE_REQ_DEST
189 else:
190 supported_dest = SUPPORTED_OPTIONS_REQ_DEST
191 req_options = {}
192 for dest in supported_dest:
193 if dest in line.opts.__dict__ and line.opts.__dict__[dest]:
194 req_options[dest] = line.opts.__dict__[dest]
196 line_source = f"line {line.lineno} of {line.filename}"
197 return ParsedRequirement(
198 requirement=line.requirement,
199 is_editable=line.is_editable,
200 comes_from=line_comes_from,
201 constraint=line.constraint,
202 options=req_options,
203 line_source=line_source,
204 )
207def handle_option_line(
208 opts: Values,
209 filename: str,
210 lineno: int,
211 finder: Optional["PackageFinder"] = None,
212 options: Optional[optparse.Values] = None,
213 session: Optional[PipSession] = None,
214) -> None:
215 if opts.hashes:
216 logger.warning(
217 "%s line %s has --hash but no requirement, and will be ignored.",
218 filename,
219 lineno,
220 )
222 if options:
223 # percolate options upward
224 if opts.require_hashes:
225 options.require_hashes = opts.require_hashes
226 if opts.features_enabled:
227 options.features_enabled.extend(
228 f for f in opts.features_enabled if f not in options.features_enabled
229 )
231 # set finder options
232 if finder:
233 find_links = finder.find_links
234 index_urls = finder.index_urls
235 no_index = finder.search_scope.no_index
236 if opts.no_index is True:
237 no_index = True
238 index_urls = []
239 if opts.index_url and not no_index:
240 index_urls = [opts.index_url]
241 if opts.extra_index_urls and not no_index:
242 index_urls.extend(opts.extra_index_urls)
243 if opts.find_links:
244 # FIXME: it would be nice to keep track of the source
245 # of the find_links: support a find-links local path
246 # relative to a requirements file.
247 value = opts.find_links[0]
248 req_dir = os.path.dirname(os.path.abspath(filename))
249 relative_to_reqs_file = os.path.join(req_dir, value)
250 if os.path.exists(relative_to_reqs_file):
251 value = relative_to_reqs_file
252 find_links.append(value)
254 if session:
255 # We need to update the auth urls in session
256 session.update_index_urls(index_urls)
258 search_scope = SearchScope(
259 find_links=find_links,
260 index_urls=index_urls,
261 no_index=no_index,
262 )
263 finder.search_scope = search_scope
265 if opts.pre:
266 finder.set_allow_all_prereleases()
268 if opts.prefer_binary:
269 finder.set_prefer_binary()
271 if session:
272 for host in opts.trusted_hosts or []:
273 source = f"line {lineno} of {filename}"
274 session.add_trusted_host(host, source=source)
277def handle_line(
278 line: ParsedLine,
279 options: Optional[optparse.Values] = None,
280 finder: Optional["PackageFinder"] = None,
281 session: Optional[PipSession] = None,
282) -> Optional[ParsedRequirement]:
283 """Handle a single parsed requirements line; This can result in
284 creating/yielding requirements, or updating the finder.
286 :param line: The parsed line to be processed.
287 :param options: CLI options.
288 :param finder: The finder - updated by non-requirement lines.
289 :param session: The session - updated by non-requirement lines.
291 Returns a ParsedRequirement object if the line is a requirement line,
292 otherwise returns None.
294 For lines that contain requirements, the only options that have an effect
295 are from SUPPORTED_OPTIONS_REQ, and they are scoped to the
296 requirement. Other options from SUPPORTED_OPTIONS may be present, but are
297 ignored.
299 For lines that do not contain requirements, the only options that have an
300 effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may
301 be present, but are ignored. These lines may contain multiple options
302 (although our docs imply only one is supported), and all our parsed and
303 affect the finder.
304 """
306 if line.is_requirement:
307 parsed_req = handle_requirement_line(line, options)
308 return parsed_req
309 else:
310 handle_option_line(
311 line.opts,
312 line.filename,
313 line.lineno,
314 finder,
315 options,
316 session,
317 )
318 return None
321class RequirementsFileParser:
322 def __init__(
323 self,
324 session: PipSession,
325 line_parser: LineParser,
326 ) -> None:
327 self._session = session
328 self._line_parser = line_parser
330 def parse(
331 self, filename: str, constraint: bool
332 ) -> Generator[ParsedLine, None, None]:
333 """Parse a given file, yielding parsed lines."""
334 yield from self._parse_and_recurse(filename, constraint)
336 def _parse_and_recurse(
337 self, filename: str, constraint: bool
338 ) -> Generator[ParsedLine, None, None]:
339 for line in self._parse_file(filename, constraint):
340 if not line.is_requirement and (
341 line.opts.requirements or line.opts.constraints
342 ):
343 # parse a nested requirements file
344 if line.opts.requirements:
345 req_path = line.opts.requirements[0]
346 nested_constraint = False
347 else:
348 req_path = line.opts.constraints[0]
349 nested_constraint = True
351 # original file is over http
352 if SCHEME_RE.search(filename):
353 # do a url join so relative paths work
354 req_path = urllib.parse.urljoin(filename, req_path)
355 # original file and nested file are paths
356 elif not SCHEME_RE.search(req_path):
357 # do a join so relative paths work
358 req_path = os.path.join(
359 os.path.dirname(filename),
360 req_path,
361 )
363 yield from self._parse_and_recurse(req_path, nested_constraint)
364 else:
365 yield line
367 def _parse_file(
368 self, filename: str, constraint: bool
369 ) -> Generator[ParsedLine, None, None]:
370 _, content = get_file_content(filename, self._session)
372 lines_enum = preprocess(content)
374 for line_number, line in lines_enum:
375 try:
376 args_str, opts = self._line_parser(line)
377 except OptionParsingError as e:
378 # add offending line
379 msg = f"Invalid requirement: {line}\n{e.msg}"
380 raise RequirementsFileParseError(msg)
382 yield ParsedLine(
383 filename,
384 line_number,
385 args_str,
386 opts,
387 constraint,
388 )
391def get_line_parser(finder: Optional["PackageFinder"]) -> LineParser:
392 def parse_line(line: str) -> Tuple[str, Values]:
393 # Build new parser for each line since it accumulates appendable
394 # options.
395 parser = build_parser()
396 defaults = parser.get_default_values()
397 defaults.index_url = None
398 if finder:
399 defaults.format_control = finder.format_control
401 args_str, options_str = break_args_options(line)
403 try:
404 options = shlex.split(options_str)
405 except ValueError as e:
406 raise OptionParsingError(f"Could not split options: {options_str}") from e
408 opts, _ = parser.parse_args(options, defaults)
410 return args_str, opts
412 return parse_line
415def break_args_options(line: str) -> Tuple[str, str]:
416 """Break up the line into an args and options string. We only want to shlex
417 (and then optparse) the options, not the args. args can contain markers
418 which are corrupted by shlex.
419 """
420 tokens = line.split(" ")
421 args = []
422 options = tokens[:]
423 for token in tokens:
424 if token.startswith("-") or token.startswith("--"):
425 break
426 else:
427 args.append(token)
428 options.pop(0)
429 return " ".join(args), " ".join(options)
432class OptionParsingError(Exception):
433 def __init__(self, msg: str) -> None:
434 self.msg = msg
437def build_parser() -> optparse.OptionParser:
438 """
439 Return a parser for parsing requirement lines
440 """
441 parser = optparse.OptionParser(add_help_option=False)
443 option_factories = SUPPORTED_OPTIONS + SUPPORTED_OPTIONS_REQ
444 for option_factory in option_factories:
445 option = option_factory()
446 parser.add_option(option)
448 # By default optparse sys.exits on parsing errors. We want to wrap
449 # that in our own exception.
450 def parser_exit(self: Any, msg: str) -> "NoReturn":
451 raise OptionParsingError(msg)
453 # NOTE: mypy disallows assigning to a method
454 # https://github.com/python/mypy/issues/2427
455 parser.exit = parser_exit # type: ignore
457 return parser
460def join_lines(lines_enum: ReqFileLines) -> ReqFileLines:
461 """Joins a line ending in '\' with the previous line (except when following
462 comments). The joined line takes on the index of the first line.
463 """
464 primary_line_number = None
465 new_line: List[str] = []
466 for line_number, line in lines_enum:
467 if not line.endswith("\\") or COMMENT_RE.match(line):
468 if COMMENT_RE.match(line):
469 # this ensures comments are always matched later
470 line = " " + line
471 if new_line:
472 new_line.append(line)
473 assert primary_line_number is not None
474 yield primary_line_number, "".join(new_line)
475 new_line = []
476 else:
477 yield line_number, line
478 else:
479 if not new_line:
480 primary_line_number = line_number
481 new_line.append(line.strip("\\"))
483 # last line contains \
484 if new_line:
485 assert primary_line_number is not None
486 yield primary_line_number, "".join(new_line)
488 # TODO: handle space after '\'.
491def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines:
492 """
493 Strips comments and filter empty lines.
494 """
495 for line_number, line in lines_enum:
496 line = COMMENT_RE.sub("", line)
497 line = line.strip()
498 if line:
499 yield line_number, line
502def expand_env_variables(lines_enum: ReqFileLines) -> ReqFileLines:
503 """Replace all environment variables that can be retrieved via `os.getenv`.
505 The only allowed format for environment variables defined in the
506 requirement file is `${MY_VARIABLE_1}` to ensure two things:
508 1. Strings that contain a `$` aren't accidentally (partially) expanded.
509 2. Ensure consistency across platforms for requirement files.
511 These points are the result of a discussion on the `github pull
512 request #3514 <https://github.com/pypa/pip/pull/3514>`_.
514 Valid characters in variable names follow the `POSIX standard
515 <http://pubs.opengroup.org/onlinepubs/9699919799/>`_ and are limited
516 to uppercase letter, digits and the `_` (underscore).
517 """
518 for line_number, line in lines_enum:
519 for env_var, var_name in ENV_VAR_RE.findall(line):
520 value = os.getenv(var_name)
521 if not value:
522 continue
524 line = line.replace(env_var, value)
526 yield line_number, line
529def get_file_content(url: str, session: PipSession) -> Tuple[str, str]:
530 """Gets the content of a file; it may be a filename, file: URL, or
531 http: URL. Returns (location, content). Content is unicode.
532 Respects # -*- coding: declarations on the retrieved files.
534 :param url: File path or url.
535 :param session: PipSession instance.
536 """
537 scheme = get_url_scheme(url)
539 # Pip has special support for file:// URLs (LocalFSAdapter).
540 if scheme in ["http", "https", "file"]:
541 resp = session.get(url)
542 raise_for_status(resp)
543 return resp.url, resp.text
545 # Assume this is a bare path.
546 try:
547 with open(url, "rb") as f:
548 content = auto_decode(f.read())
549 except OSError as exc:
550 raise InstallationError(f"Could not open requirements file: {exc}")
551 return url, content