1"""
2Requirements file parsing
3"""
4
5from __future__ import annotations
6
7import codecs
8import locale
9import logging
10import optparse
11import os
12import re
13import shlex
14import sys
15import urllib.parse
16from collections.abc import Generator, Iterable
17from dataclasses import dataclass
18from optparse import Values
19from typing import (
20 TYPE_CHECKING,
21 Any,
22 Callable,
23 NoReturn,
24)
25
26from pip._internal.cli import cmdoptions
27from pip._internal.exceptions import InstallationError, RequirementsFileParseError
28from pip._internal.models.release_control import ReleaseControl
29from pip._internal.models.search_scope import SearchScope
30
31if TYPE_CHECKING:
32 from pip._internal.index.package_finder import PackageFinder
33 from pip._internal.network.session import PipSession
34
35__all__ = ["parse_requirements"]
36
37ReqFileLines = Iterable[tuple[int, str]]
38
39LineParser = Callable[[str], tuple[str, Values]]
40
41SCHEME_RE = re.compile(r"^(http|https|file):", re.I)
42COMMENT_RE = re.compile(r"(^|\s+)#.*$")
43
44# Matches environment variable-style values in '${MY_VARIABLE_1}' with the
45# variable name consisting of only uppercase letters, digits or the '_'
46# (underscore). This follows the POSIX standard defined in IEEE Std 1003.1,
47# 2013 Edition.
48ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})")
49
50SUPPORTED_OPTIONS: list[Callable[..., optparse.Option]] = [
51 cmdoptions.index_url,
52 cmdoptions.extra_index_url,
53 cmdoptions.no_index,
54 cmdoptions.constraints,
55 cmdoptions.requirements,
56 cmdoptions.editable,
57 cmdoptions.find_links,
58 cmdoptions.no_binary,
59 cmdoptions.only_binary,
60 cmdoptions.prefer_binary,
61 cmdoptions.require_hashes,
62 cmdoptions.pre,
63 cmdoptions.all_releases,
64 cmdoptions.only_final,
65 cmdoptions.trusted_host,
66 cmdoptions.use_new_feature,
67]
68
69# options to be passed to requirements
70SUPPORTED_OPTIONS_REQ: list[Callable[..., optparse.Option]] = [
71 cmdoptions.hash,
72 cmdoptions.config_settings,
73]
74
75SUPPORTED_OPTIONS_EDITABLE_REQ: list[Callable[..., optparse.Option]] = [
76 cmdoptions.config_settings,
77]
78
79
80# the 'dest' string values
81SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ]
82SUPPORTED_OPTIONS_EDITABLE_REQ_DEST = [
83 str(o().dest) for o in SUPPORTED_OPTIONS_EDITABLE_REQ
84]
85
86# order of BOMS is important: codecs.BOM_UTF16_LE is a prefix of codecs.BOM_UTF32_LE
87# so data.startswith(BOM_UTF16_LE) would be true for UTF32_LE data
88BOMS: list[tuple[bytes, str]] = [
89 (codecs.BOM_UTF8, "utf-8"),
90 (codecs.BOM_UTF32, "utf-32"),
91 (codecs.BOM_UTF32_BE, "utf-32-be"),
92 (codecs.BOM_UTF32_LE, "utf-32-le"),
93 (codecs.BOM_UTF16, "utf-16"),
94 (codecs.BOM_UTF16_BE, "utf-16-be"),
95 (codecs.BOM_UTF16_LE, "utf-16-le"),
96]
97
98PEP263_ENCODING_RE = re.compile(rb"coding[:=]\s*([-\w.]+)")
99DEFAULT_ENCODING = "utf-8"
100
101logger = logging.getLogger(__name__)
102
103
104@dataclass(frozen=True)
105class ParsedRequirement:
106 # TODO: replace this with slots=True when dropping Python 3.9 support.
107 __slots__ = (
108 "requirement",
109 "is_editable",
110 "comes_from",
111 "constraint",
112 "options",
113 "line_source",
114 )
115
116 requirement: str
117 is_editable: bool
118 comes_from: str
119 constraint: bool
120 options: dict[str, Any] | None
121 line_source: str | None
122
123
124@dataclass(frozen=True)
125class ParsedLine:
126 __slots__ = ("filename", "lineno", "args", "opts", "constraint")
127
128 filename: str
129 lineno: int
130 args: str
131 opts: Values
132 constraint: bool
133
134 @property
135 def is_editable(self) -> bool:
136 return bool(self.opts.editables)
137
138 @property
139 def requirement(self) -> str | None:
140 if self.args:
141 return self.args
142 elif self.is_editable:
143 # We don't support multiple -e on one line
144 return self.opts.editables[0]
145 return None
146
147
148def parse_requirements(
149 filename: str,
150 session: PipSession,
151 finder: PackageFinder | None = None,
152 options: optparse.Values | None = None,
153 constraint: bool = False,
154) -> Generator[ParsedRequirement, None, None]:
155 """Parse a requirements file and yield ParsedRequirement instances.
156
157 :param filename: Path or url of requirements file.
158 :param session: PipSession instance.
159 :param finder: Instance of pip.index.PackageFinder.
160 :param options: cli options.
161 :param constraint: If true, parsing a constraint file rather than
162 requirements file.
163 """
164 line_parser = get_line_parser(finder)
165 parser = RequirementsFileParser(session, line_parser)
166
167 for parsed_line in parser.parse(filename, constraint):
168 parsed_req = handle_line(
169 parsed_line, options=options, finder=finder, session=session
170 )
171 if parsed_req is not None:
172 yield parsed_req
173
174
175def preprocess(content: str) -> ReqFileLines:
176 """Split, filter, and join lines, and return a line iterator
177
178 :param content: the content of the requirements file
179 """
180 lines_enum: ReqFileLines = enumerate(content.splitlines(), start=1)
181 lines_enum = join_lines(lines_enum)
182 lines_enum = ignore_comments(lines_enum)
183 lines_enum = expand_env_variables(lines_enum)
184 return lines_enum
185
186
187def handle_requirement_line(
188 line: ParsedLine,
189 options: optparse.Values | None = None,
190) -> ParsedRequirement:
191 # preserve for the nested code path
192 line_comes_from = "{} {} (line {})".format(
193 "-c" if line.constraint else "-r",
194 line.filename,
195 line.lineno,
196 )
197
198 assert line.requirement is not None
199
200 # get the options that apply to requirements
201 if line.is_editable:
202 supported_dest = SUPPORTED_OPTIONS_EDITABLE_REQ_DEST
203 else:
204 supported_dest = SUPPORTED_OPTIONS_REQ_DEST
205 req_options = {}
206 for dest in supported_dest:
207 if dest in line.opts.__dict__ and line.opts.__dict__[dest]:
208 req_options[dest] = line.opts.__dict__[dest]
209
210 line_source = f"line {line.lineno} of {line.filename}"
211 return ParsedRequirement(
212 requirement=line.requirement,
213 is_editable=line.is_editable,
214 comes_from=line_comes_from,
215 constraint=line.constraint,
216 options=req_options,
217 line_source=line_source,
218 )
219
220
221def handle_option_line(
222 opts: Values,
223 filename: str,
224 lineno: int,
225 finder: PackageFinder | None = None,
226 options: optparse.Values | None = None,
227 session: PipSession | None = None,
228) -> None:
229 if opts.hashes:
230 logger.warning(
231 "%s line %s has --hash but no requirement, and will be ignored.",
232 filename,
233 lineno,
234 )
235
236 if options:
237 # percolate options upward
238 if opts.require_hashes:
239 options.require_hashes = opts.require_hashes
240 if opts.features_enabled:
241 options.features_enabled.extend(
242 f for f in opts.features_enabled if f not in options.features_enabled
243 )
244
245 # set finder options
246 if finder:
247 find_links = finder.find_links
248 index_urls = finder.index_urls
249 no_index = finder.search_scope.no_index
250 if opts.no_index is True:
251 no_index = True
252 index_urls = []
253 if opts.index_url and not no_index:
254 index_urls = [opts.index_url]
255 if opts.extra_index_urls and not no_index:
256 index_urls.extend(opts.extra_index_urls)
257 if opts.find_links:
258 # FIXME: it would be nice to keep track of the source
259 # of the find_links: support a find-links local path
260 # relative to a requirements file.
261 value = opts.find_links[0]
262 req_dir = os.path.dirname(os.path.abspath(filename))
263 relative_to_reqs_file = os.path.join(req_dir, value)
264 if os.path.exists(relative_to_reqs_file):
265 value = relative_to_reqs_file
266 find_links.append(value)
267
268 if session:
269 # We need to update the auth urls in session
270 session.update_index_urls(index_urls)
271
272 search_scope = SearchScope(
273 find_links=find_links,
274 index_urls=index_urls,
275 no_index=no_index,
276 )
277 finder.search_scope = search_scope
278
279 # Transform --pre into --all-releases :all:
280 if opts.pre:
281 if not opts.release_control:
282 opts.release_control = ReleaseControl()
283 opts.release_control.all_releases.add(":all:")
284
285 if opts.release_control:
286 if not finder.release_control:
287 # First time seeing release_control, set it on finder
288 finder.set_release_control(opts.release_control)
289
290 if opts.prefer_binary:
291 finder.set_prefer_binary()
292
293 if session:
294 for host in opts.trusted_hosts or []:
295 source = f"line {lineno} of {filename}"
296 session.add_trusted_host(host, source=source)
297
298
299def handle_line(
300 line: ParsedLine,
301 options: optparse.Values | None = None,
302 finder: PackageFinder | None = None,
303 session: PipSession | None = None,
304) -> ParsedRequirement | None:
305 """Handle a single parsed requirements line; This can result in
306 creating/yielding requirements, or updating the finder.
307
308 :param line: The parsed line to be processed.
309 :param options: CLI options.
310 :param finder: The finder - updated by non-requirement lines.
311 :param session: The session - updated by non-requirement lines.
312
313 Returns a ParsedRequirement object if the line is a requirement line,
314 otherwise returns None.
315
316 For lines that contain requirements, the only options that have an effect
317 are from SUPPORTED_OPTIONS_REQ, and they are scoped to the
318 requirement. Other options from SUPPORTED_OPTIONS may be present, but are
319 ignored.
320
321 For lines that do not contain requirements, the only options that have an
322 effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may
323 be present, but are ignored. These lines may contain multiple options
324 (although our docs imply only one is supported), and all our parsed and
325 affect the finder.
326 """
327
328 if line.requirement is not None:
329 parsed_req = handle_requirement_line(line, options)
330 return parsed_req
331 else:
332 handle_option_line(
333 line.opts,
334 line.filename,
335 line.lineno,
336 finder,
337 options,
338 session,
339 )
340 return None
341
342
343class RequirementsFileParser:
344 def __init__(
345 self,
346 session: PipSession,
347 line_parser: LineParser,
348 ) -> None:
349 self._session = session
350 self._line_parser = line_parser
351
352 def parse(
353 self, filename: str, constraint: bool
354 ) -> Generator[ParsedLine, None, None]:
355 """Parse a given file, yielding parsed lines."""
356 yield from self._parse_and_recurse(
357 filename, constraint, [{os.path.abspath(filename): None}]
358 )
359
360 def _parse_and_recurse(
361 self,
362 filename: str,
363 constraint: bool,
364 parsed_files_stack: list[dict[str, str | None]],
365 ) -> Generator[ParsedLine, None, None]:
366 for line in self._parse_file(filename, constraint):
367 if line.requirement is None and (
368 line.opts.requirements or line.opts.constraints
369 ):
370 # parse a nested requirements file
371 if line.opts.requirements:
372 req_path = line.opts.requirements[0]
373 nested_constraint = False
374 else:
375 req_path = line.opts.constraints[0]
376 nested_constraint = True
377
378 # original file is over http
379 if SCHEME_RE.search(filename):
380 # do a url join so relative paths work
381 req_path = urllib.parse.urljoin(filename, req_path)
382 # original file and nested file are paths
383 elif not SCHEME_RE.search(req_path):
384 # do a join so relative paths work
385 # and then abspath so that we can identify recursive references
386 req_path = os.path.abspath(
387 os.path.join(
388 os.path.dirname(filename),
389 req_path,
390 )
391 )
392 parsed_files = parsed_files_stack[0]
393 if req_path in parsed_files:
394 initial_file = parsed_files[req_path]
395 tail = (
396 f" and again in {initial_file}"
397 if initial_file is not None
398 else ""
399 )
400 raise RequirementsFileParseError(
401 f"{req_path} recursively references itself in {filename}{tail}"
402 )
403 # Keeping a track where was each file first included in
404 new_parsed_files = parsed_files.copy()
405 new_parsed_files[req_path] = filename
406 yield from self._parse_and_recurse(
407 req_path, nested_constraint, [new_parsed_files, *parsed_files_stack]
408 )
409 else:
410 yield line
411
412 def _parse_file(
413 self, filename: str, constraint: bool
414 ) -> Generator[ParsedLine, None, None]:
415 _, content = get_file_content(filename, self._session, constraint=constraint)
416
417 lines_enum = preprocess(content)
418
419 for line_number, line in lines_enum:
420 try:
421 args_str, opts = self._line_parser(line)
422 except OptionParsingError as e:
423 # add offending line
424 msg = f"Invalid requirement: {line}\n{e.msg}"
425 raise RequirementsFileParseError(msg)
426
427 yield ParsedLine(
428 filename,
429 line_number,
430 args_str,
431 opts,
432 constraint,
433 )
434
435
436def get_line_parser(finder: PackageFinder | None) -> LineParser:
437 def parse_line(line: str) -> tuple[str, Values]:
438 # Build new parser for each line since it accumulates appendable
439 # options.
440 parser = build_parser()
441 defaults = parser.get_default_values()
442 defaults.index_url = None
443 if finder:
444 defaults.format_control = finder.format_control
445 defaults.release_control = finder.release_control
446
447 args_str, options_str = break_args_options(line)
448
449 try:
450 options = shlex.split(options_str)
451 except ValueError as e:
452 raise OptionParsingError(f"Could not split options: {options_str}") from e
453
454 opts, _ = parser.parse_args(options, defaults)
455
456 return args_str, opts
457
458 return parse_line
459
460
461def break_args_options(line: str) -> tuple[str, str]:
462 """Break up the line into an args and options string. We only want to shlex
463 (and then optparse) the options, not the args. args can contain markers
464 which are corrupted by shlex.
465 """
466 tokens = line.split(" ")
467 args = []
468 options = tokens[:]
469 for token in tokens:
470 if token.startswith(("-", "--")):
471 break
472 else:
473 args.append(token)
474 options.pop(0)
475 return " ".join(args), " ".join(options)
476
477
478class OptionParsingError(Exception):
479 def __init__(self, msg: str) -> None:
480 self.msg = msg
481
482
483def build_parser() -> optparse.OptionParser:
484 """
485 Return a parser for parsing requirement lines
486 """
487 parser = optparse.OptionParser(add_help_option=False)
488
489 option_factories = SUPPORTED_OPTIONS + SUPPORTED_OPTIONS_REQ
490 for option_factory in option_factories:
491 option = option_factory()
492 parser.add_option(option)
493
494 # By default optparse sys.exits on parsing errors. We want to wrap
495 # that in our own exception.
496 def parser_exit(self: Any, msg: str) -> NoReturn:
497 raise OptionParsingError(msg)
498
499 # NOTE: mypy disallows assigning to a method
500 # https://github.com/python/mypy/issues/2427
501 parser.exit = parser_exit # type: ignore
502
503 return parser
504
505
506def join_lines(lines_enum: ReqFileLines) -> ReqFileLines:
507 """Joins a line ending in '\' with the previous line (except when following
508 comments). The joined line takes on the index of the first line.
509 """
510 primary_line_number = None
511 new_line: list[str] = []
512 for line_number, line in lines_enum:
513 if not line.endswith("\\") or COMMENT_RE.match(line):
514 if COMMENT_RE.match(line):
515 # this ensures comments are always matched later
516 line = " " + line
517 if new_line:
518 new_line.append(line)
519 assert primary_line_number is not None
520 yield primary_line_number, "".join(new_line)
521 new_line = []
522 else:
523 yield line_number, line
524 else:
525 if not new_line:
526 primary_line_number = line_number
527 new_line.append(line.strip("\\"))
528
529 # last line contains \
530 if new_line:
531 assert primary_line_number is not None
532 yield primary_line_number, "".join(new_line)
533
534 # TODO: handle space after '\'.
535
536
537def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines:
538 """
539 Strips comments and filter empty lines.
540 """
541 for line_number, line in lines_enum:
542 line = COMMENT_RE.sub("", line)
543 line = line.strip()
544 if line:
545 yield line_number, line
546
547
548def expand_env_variables(lines_enum: ReqFileLines) -> ReqFileLines:
549 """Replace all environment variables that can be retrieved via `os.getenv`.
550
551 The only allowed format for environment variables defined in the
552 requirement file is `${MY_VARIABLE_1}` to ensure two things:
553
554 1. Strings that contain a `$` aren't accidentally (partially) expanded.
555 2. Ensure consistency across platforms for requirement files.
556
557 These points are the result of a discussion on the `github pull
558 request #3514 <https://github.com/pypa/pip/pull/3514>`_.
559
560 Valid characters in variable names follow the `POSIX standard
561 <http://pubs.opengroup.org/onlinepubs/9699919799/>`_ and are limited
562 to uppercase letter, digits and the `_` (underscore).
563 """
564 for line_number, line in lines_enum:
565 for env_var, var_name in ENV_VAR_RE.findall(line):
566 value = os.getenv(var_name)
567 if not value:
568 continue
569
570 line = line.replace(env_var, value)
571
572 yield line_number, line
573
574
575def get_file_content(
576 url: str, session: PipSession, *, constraint: bool = False
577) -> tuple[str, str]:
578 """Gets the content of a file; it may be a filename, file: URL, or
579 http: URL. Returns (location, content). Content is unicode.
580 Respects # -*- coding: declarations on the retrieved files.
581
582 :param url: File path or url.
583 :param session: PipSession instance.
584 """
585 scheme = urllib.parse.urlsplit(url).scheme
586 # Pip has special support for file:// URLs (LocalFSAdapter).
587 if scheme in ["http", "https", "file"]:
588 # Delay importing heavy network modules until absolutely necessary.
589 from pip._internal.network.utils import raise_for_status
590
591 resp = session.get(url)
592 raise_for_status(resp)
593 return resp.url, resp.text
594
595 # Assume this is a bare path.
596 try:
597 with open(url, "rb") as f:
598 raw_content = f.read()
599 except OSError as exc:
600 kind = "constraint" if constraint else "requirements"
601 raise InstallationError(f"Could not open {kind} file: {exc}")
602
603 content = _decode_req_file(raw_content, url)
604
605 return url, content
606
607
608def _decode_req_file(data: bytes, url: str) -> str:
609 for bom, encoding in BOMS:
610 if data.startswith(bom):
611 return data[len(bom) :].decode(encoding)
612
613 for line in data.split(b"\n")[:2]:
614 if line[0:1] == b"#":
615 result = PEP263_ENCODING_RE.search(line)
616 if result is not None:
617 encoding = result.groups()[0].decode("ascii")
618 return data.decode(encoding)
619
620 try:
621 return data.decode(DEFAULT_ENCODING)
622 except UnicodeDecodeError:
623 locale_encoding = locale.getpreferredencoding(False) or sys.getdefaultencoding()
624 logging.warning(
625 "unable to decode data from %s with default encoding %s, "
626 "falling back to encoding from locale: %s. "
627 "If this is intentional you should specify the encoding with a "
628 "PEP-263 style comment, e.g. '# -*- coding: %s -*-'",
629 url,
630 DEFAULT_ENCODING,
631 locale_encoding,
632 locale_encoding,
633 )
634 return data.decode(locale_encoding)