1"""
2Requirements file parsing
3"""
4
5from __future__ import annotations
6
7import codecs
8import locale
9import logging
10import optparse
11import os
12import re
13import shlex
14import sys
15import urllib.parse
16from collections.abc import Generator, Iterable
17from dataclasses import dataclass
18from optparse import Values
19from typing import (
20 TYPE_CHECKING,
21 Any,
22 Callable,
23 NoReturn,
24)
25
26from pip._internal.cli import cmdoptions
27from pip._internal.exceptions import InstallationError, RequirementsFileParseError
28from pip._internal.models.release_control import ReleaseControl
29from pip._internal.models.search_scope import SearchScope
30
31if TYPE_CHECKING:
32 from pip._internal.index.package_finder import PackageFinder
33 from pip._internal.network.session import PipSession
34
35__all__ = ["parse_requirements"]
36
37ReqFileLines = Iterable[tuple[int, str]]
38
39LineParser = Callable[[str], tuple[str, Values]]
40
41SCHEME_RE = re.compile(r"^(http|https|file):", re.I)
42COMMENT_RE = re.compile(r"(^|\s+)#.*$")
43
44# Matches environment variable-style values in '${MY_VARIABLE_1}' with the
45# variable name consisting of only uppercase letters, digits or the '_'
46# (underscore). This follows the POSIX standard defined in IEEE Std 1003.1,
47# 2013 Edition.
48ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})")
49
50SUPPORTED_OPTIONS: list[Callable[..., optparse.Option]] = [
51 cmdoptions.index_url,
52 cmdoptions.extra_index_url,
53 cmdoptions.no_index,
54 cmdoptions.constraints,
55 cmdoptions.requirements,
56 cmdoptions.editable,
57 cmdoptions.find_links,
58 cmdoptions.no_binary,
59 cmdoptions.only_binary,
60 cmdoptions.prefer_binary,
61 cmdoptions.require_hashes,
62 cmdoptions.pre,
63 cmdoptions.all_releases,
64 cmdoptions.only_final,
65 cmdoptions.trusted_host,
66 cmdoptions.use_new_feature,
67]
68
69# options to be passed to requirements
70SUPPORTED_OPTIONS_REQ: list[Callable[..., optparse.Option]] = [
71 cmdoptions.hash,
72 cmdoptions.config_settings,
73]
74
75SUPPORTED_OPTIONS_EDITABLE_REQ: list[Callable[..., optparse.Option]] = [
76 cmdoptions.config_settings,
77]
78
79
80# the 'dest' string values
81SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ]
82SUPPORTED_OPTIONS_EDITABLE_REQ_DEST = [
83 str(o().dest) for o in SUPPORTED_OPTIONS_EDITABLE_REQ
84]
85
86# order of BOMS is important: codecs.BOM_UTF16_LE is a prefix of codecs.BOM_UTF32_LE
87# so data.startswith(BOM_UTF16_LE) would be true for UTF32_LE data
88BOMS: list[tuple[bytes, str]] = [
89 (codecs.BOM_UTF8, "utf-8"),
90 (codecs.BOM_UTF32, "utf-32"),
91 (codecs.BOM_UTF32_BE, "utf-32-be"),
92 (codecs.BOM_UTF32_LE, "utf-32-le"),
93 (codecs.BOM_UTF16, "utf-16"),
94 (codecs.BOM_UTF16_BE, "utf-16-be"),
95 (codecs.BOM_UTF16_LE, "utf-16-le"),
96]
97
98PEP263_ENCODING_RE = re.compile(rb"coding[:=]\s*([-\w.]+)")
99DEFAULT_ENCODING = "utf-8"
100
101logger = logging.getLogger(__name__)
102
103
104@dataclass(frozen=True, slots=True)
105class ParsedRequirement:
106 requirement: str
107 is_editable: bool
108 comes_from: str
109 constraint: bool
110 options: dict[str, Any] | None
111 line_source: str | None
112
113
114@dataclass(frozen=True, slots=True)
115class ParsedLine:
116 filename: str
117 lineno: int
118 args: str
119 opts: Values
120 constraint: bool
121
122 @property
123 def is_editable(self) -> bool:
124 return bool(self.opts.editables)
125
126 @property
127 def requirement(self) -> str | None:
128 if self.args:
129 return self.args
130 elif self.is_editable:
131 # We don't support multiple -e on one line
132 return self.opts.editables[0]
133 return None
134
135
136def parse_requirements(
137 filename: str,
138 session: PipSession,
139 finder: PackageFinder | None = None,
140 options: optparse.Values | None = None,
141 constraint: bool = False,
142) -> Generator[ParsedRequirement, None, None]:
143 """Parse a requirements file and yield ParsedRequirement instances.
144
145 :param filename: Path or url of requirements file.
146 :param session: PipSession instance.
147 :param finder: Instance of pip.index.PackageFinder.
148 :param options: cli options.
149 :param constraint: If true, parsing a constraint file rather than
150 requirements file.
151 """
152 line_parser = get_line_parser(finder)
153 parser = RequirementsFileParser(session, line_parser)
154
155 for parsed_line in parser.parse(filename, constraint):
156 parsed_req = handle_line(
157 parsed_line, options=options, finder=finder, session=session
158 )
159 if parsed_req is not None:
160 yield parsed_req
161
162
163def preprocess(content: str) -> ReqFileLines:
164 """Split, filter, and join lines, and return a line iterator
165
166 :param content: the content of the requirements file
167 """
168 lines_enum: ReqFileLines = enumerate(content.splitlines(), start=1)
169 lines_enum = join_lines(lines_enum)
170 lines_enum = ignore_comments(lines_enum)
171 lines_enum = expand_env_variables(lines_enum)
172 return lines_enum
173
174
175def handle_requirement_line(
176 line: ParsedLine,
177 options: optparse.Values | None = None,
178) -> ParsedRequirement:
179 # preserve for the nested code path
180 line_comes_from = "{} {} (line {})".format(
181 "-c" if line.constraint else "-r",
182 line.filename,
183 line.lineno,
184 )
185
186 assert line.requirement is not None
187
188 # get the options that apply to requirements
189 if line.is_editable:
190 supported_dest = SUPPORTED_OPTIONS_EDITABLE_REQ_DEST
191 else:
192 supported_dest = SUPPORTED_OPTIONS_REQ_DEST
193 req_options = {}
194 for dest in supported_dest:
195 if dest in line.opts.__dict__ and line.opts.__dict__[dest]:
196 req_options[dest] = line.opts.__dict__[dest]
197
198 line_source = f"line {line.lineno} of {line.filename}"
199 return ParsedRequirement(
200 requirement=line.requirement,
201 is_editable=line.is_editable,
202 comes_from=line_comes_from,
203 constraint=line.constraint,
204 options=req_options,
205 line_source=line_source,
206 )
207
208
209def handle_option_line(
210 opts: Values,
211 filename: str,
212 lineno: int,
213 finder: PackageFinder | None = None,
214 options: optparse.Values | None = None,
215 session: PipSession | None = None,
216) -> None:
217 if opts.hashes:
218 logger.warning(
219 "%s line %s has --hash but no requirement, and will be ignored.",
220 filename,
221 lineno,
222 )
223
224 if options:
225 # percolate options upward
226 if opts.require_hashes:
227 options.require_hashes = opts.require_hashes
228 if opts.features_enabled:
229 options.features_enabled.extend(
230 f for f in opts.features_enabled if f not in options.features_enabled
231 )
232
233 # set finder options
234 if finder:
235 find_links = finder.find_links
236 index_urls = finder.index_urls
237 no_index = finder.search_scope.no_index
238 if opts.no_index is True:
239 no_index = True
240 index_urls = []
241 if opts.index_url and not no_index:
242 index_urls = [opts.index_url]
243 if opts.extra_index_urls and not no_index:
244 index_urls.extend(opts.extra_index_urls)
245 if opts.find_links:
246 # FIXME: it would be nice to keep track of the source
247 # of the find_links: support a find-links local path
248 # relative to a requirements file.
249 value = opts.find_links[0]
250 req_dir = os.path.dirname(os.path.abspath(filename))
251 relative_to_reqs_file = os.path.join(req_dir, value)
252 if os.path.exists(relative_to_reqs_file):
253 value = relative_to_reqs_file
254 find_links.append(value)
255
256 if session:
257 # We need to update the auth urls in session
258 session.update_index_urls(index_urls)
259
260 search_scope = SearchScope(
261 find_links=find_links,
262 index_urls=index_urls,
263 no_index=no_index,
264 )
265 finder.search_scope = search_scope
266
267 # Transform --pre into --all-releases :all:
268 if opts.pre:
269 if not opts.release_control:
270 opts.release_control = ReleaseControl()
271 opts.release_control.all_releases.add(":all:")
272
273 if opts.release_control:
274 if not finder.release_control:
275 # First time seeing release_control, set it on finder
276 finder.set_release_control(opts.release_control)
277
278 if opts.prefer_binary:
279 finder.set_prefer_binary()
280
281 if session:
282 for host in opts.trusted_hosts or []:
283 source = f"line {lineno} of {filename}"
284 session.add_trusted_host(host, source=source)
285
286
287def handle_line(
288 line: ParsedLine,
289 options: optparse.Values | None = None,
290 finder: PackageFinder | None = None,
291 session: PipSession | None = None,
292) -> ParsedRequirement | None:
293 """Handle a single parsed requirements line; This can result in
294 creating/yielding requirements, or updating the finder.
295
296 :param line: The parsed line to be processed.
297 :param options: CLI options.
298 :param finder: The finder - updated by non-requirement lines.
299 :param session: The session - updated by non-requirement lines.
300
301 Returns a ParsedRequirement object if the line is a requirement line,
302 otherwise returns None.
303
304 For lines that contain requirements, the only options that have an effect
305 are from SUPPORTED_OPTIONS_REQ, and they are scoped to the
306 requirement. Other options from SUPPORTED_OPTIONS may be present, but are
307 ignored.
308
309 For lines that do not contain requirements, the only options that have an
310 effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may
311 be present, but are ignored. These lines may contain multiple options
312 (although our docs imply only one is supported), and all our parsed and
313 affect the finder.
314 """
315
316 if line.requirement is not None:
317 parsed_req = handle_requirement_line(line, options)
318 return parsed_req
319 else:
320 handle_option_line(
321 line.opts,
322 line.filename,
323 line.lineno,
324 finder,
325 options,
326 session,
327 )
328 return None
329
330
331class RequirementsFileParser:
332 def __init__(
333 self,
334 session: PipSession,
335 line_parser: LineParser,
336 ) -> None:
337 self._session = session
338 self._line_parser = line_parser
339
340 def parse(
341 self, filename: str, constraint: bool
342 ) -> Generator[ParsedLine, None, None]:
343 """Parse a given file, yielding parsed lines."""
344 yield from self._parse_and_recurse(
345 filename, constraint, [{os.path.abspath(filename): None}]
346 )
347
348 def _parse_and_recurse(
349 self,
350 filename: str,
351 constraint: bool,
352 parsed_files_stack: list[dict[str, str | None]],
353 ) -> Generator[ParsedLine, None, None]:
354 for line in self._parse_file(filename, constraint):
355 if line.requirement is None and (
356 line.opts.requirements or line.opts.constraints
357 ):
358 # parse a nested requirements file
359 if line.opts.requirements:
360 req_path = line.opts.requirements[0]
361 nested_constraint = False
362 else:
363 req_path = line.opts.constraints[0]
364 nested_constraint = True
365
366 # original file is over http
367 if SCHEME_RE.search(filename):
368 # do a url join so relative paths work
369 req_path = urllib.parse.urljoin(filename, req_path)
370 # original file and nested file are paths
371 elif not SCHEME_RE.search(req_path):
372 # do a join so relative paths work
373 # and then abspath so that we can identify recursive references
374 req_path = os.path.abspath(
375 os.path.join(
376 os.path.dirname(filename),
377 req_path,
378 )
379 )
380 parsed_files = parsed_files_stack[0]
381 if req_path in parsed_files:
382 initial_file = parsed_files[req_path]
383 tail = (
384 f" and again in {initial_file}"
385 if initial_file is not None
386 else ""
387 )
388 raise RequirementsFileParseError(
389 f"{req_path} recursively references itself in {filename}{tail}"
390 )
391 # Keeping a track where was each file first included in
392 new_parsed_files = parsed_files.copy()
393 new_parsed_files[req_path] = filename
394 yield from self._parse_and_recurse(
395 req_path, nested_constraint, [new_parsed_files, *parsed_files_stack]
396 )
397 else:
398 yield line
399
400 def _parse_file(
401 self, filename: str, constraint: bool
402 ) -> Generator[ParsedLine, None, None]:
403 _, content = get_file_content(filename, self._session, constraint=constraint)
404
405 lines_enum = preprocess(content)
406
407 for line_number, line in lines_enum:
408 try:
409 args_str, opts = self._line_parser(line)
410 except OptionParsingError as e:
411 # add offending line
412 msg = f"Invalid requirement: {line}\n{e.msg}"
413 raise RequirementsFileParseError(msg)
414
415 yield ParsedLine(
416 filename,
417 line_number,
418 args_str,
419 opts,
420 constraint,
421 )
422
423
424def get_line_parser(finder: PackageFinder | None) -> LineParser:
425 def parse_line(line: str) -> tuple[str, Values]:
426 # Build new parser for each line since it accumulates appendable
427 # options.
428 parser = build_parser()
429 defaults = parser.get_default_values()
430 defaults.index_url = None
431 if finder:
432 defaults.format_control = finder.format_control
433 defaults.release_control = finder.release_control
434
435 args_str, options_str = break_args_options(line)
436
437 try:
438 options = shlex.split(options_str)
439 except ValueError as e:
440 raise OptionParsingError(f"Could not split options: {options_str}") from e
441
442 opts, _ = parser.parse_args(options, defaults)
443
444 return args_str, opts
445
446 return parse_line
447
448
449def break_args_options(line: str) -> tuple[str, str]:
450 """Break up the line into an args and options string. We only want to shlex
451 (and then optparse) the options, not the args. args can contain markers
452 which are corrupted by shlex.
453 """
454 tokens = line.split(" ")
455 args = []
456 options = tokens[:]
457 for token in tokens:
458 if token.startswith(("-", "--")):
459 break
460 else:
461 args.append(token)
462 options.pop(0)
463 return " ".join(args), " ".join(options)
464
465
466class OptionParsingError(Exception):
467 def __init__(self, msg: str) -> None:
468 self.msg = msg
469
470
471def build_parser() -> optparse.OptionParser:
472 """
473 Return a parser for parsing requirement lines
474 """
475 parser = optparse.OptionParser(add_help_option=False)
476
477 option_factories = SUPPORTED_OPTIONS + SUPPORTED_OPTIONS_REQ
478 for option_factory in option_factories:
479 option = option_factory()
480 parser.add_option(option)
481
482 # By default optparse sys.exits on parsing errors. We want to wrap
483 # that in our own exception.
484 def parser_exit(self: Any, msg: str) -> NoReturn:
485 raise OptionParsingError(msg)
486
487 # NOTE: mypy disallows assigning to a method
488 # https://github.com/python/mypy/issues/2427
489 parser.exit = parser_exit # type: ignore
490
491 return parser
492
493
494def join_lines(lines_enum: ReqFileLines) -> ReqFileLines:
495 """Joins a line ending in '\' with the previous line (except when following
496 comments). The joined line takes on the index of the first line.
497 """
498 primary_line_number = None
499 new_line: list[str] = []
500 for line_number, line in lines_enum:
501 if not line.endswith("\\") or COMMENT_RE.match(line):
502 if COMMENT_RE.match(line):
503 # this ensures comments are always matched later
504 line = " " + line
505 if new_line:
506 new_line.append(line)
507 assert primary_line_number is not None
508 yield primary_line_number, "".join(new_line)
509 new_line = []
510 else:
511 yield line_number, line
512 else:
513 if not new_line:
514 primary_line_number = line_number
515 new_line.append(line.strip("\\"))
516
517 # last line contains \
518 if new_line:
519 assert primary_line_number is not None
520 yield primary_line_number, "".join(new_line)
521
522 # TODO: handle space after '\'.
523
524
525def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines:
526 """
527 Strips comments and filter empty lines.
528 """
529 for line_number, line in lines_enum:
530 line = COMMENT_RE.sub("", line)
531 line = line.strip()
532 if line:
533 yield line_number, line
534
535
536def expand_env_variables(lines_enum: ReqFileLines) -> ReqFileLines:
537 """Replace all environment variables that can be retrieved via `os.getenv`.
538
539 The only allowed format for environment variables defined in the
540 requirement file is `${MY_VARIABLE_1}` to ensure two things:
541
542 1. Strings that contain a `$` aren't accidentally (partially) expanded.
543 2. Ensure consistency across platforms for requirement files.
544
545 These points are the result of a discussion on the `github pull
546 request #3514 <https://github.com/pypa/pip/pull/3514>`_.
547
548 Valid characters in variable names follow the `POSIX standard
549 <http://pubs.opengroup.org/onlinepubs/9699919799/>`_ and are limited
550 to uppercase letter, digits and the `_` (underscore).
551 """
552 for line_number, line in lines_enum:
553 for env_var, var_name in ENV_VAR_RE.findall(line):
554 value = os.getenv(var_name)
555 if not value:
556 continue
557
558 line = line.replace(env_var, value)
559
560 yield line_number, line
561
562
563def get_file_content(
564 url: str, session: PipSession, *, constraint: bool = False
565) -> tuple[str, str]:
566 """Gets the content of a file; it may be a filename, file: URL, or
567 http: URL. Returns (location, content). Content is unicode.
568 Respects # -*- coding: declarations on the retrieved files.
569
570 :param url: File path or url.
571 :param session: PipSession instance.
572 """
573 scheme = urllib.parse.urlsplit(url).scheme
574 # Pip has special support for file:// URLs (LocalFSAdapter).
575 if scheme in ["http", "https", "file"]:
576 # Delay importing heavy network modules until absolutely necessary.
577 from pip._internal.network.utils import raise_for_status
578
579 resp = session.get(url)
580 raise_for_status(resp)
581 return resp.url, resp.text
582
583 # Assume this is a bare path.
584 try:
585 with open(url, "rb") as f:
586 raw_content = f.read()
587 except OSError as exc:
588 kind = "constraint" if constraint else "requirements"
589 raise InstallationError(f"Could not open {kind} file: {exc}")
590
591 content = _decode_req_file(raw_content, url)
592
593 return url, content
594
595
596def _decode_req_file(data: bytes, url: str) -> str:
597 for bom, encoding in BOMS:
598 if data.startswith(bom):
599 return data[len(bom) :].decode(encoding)
600
601 for line in data.split(b"\n")[:2]:
602 if line[0:1] == b"#":
603 result = PEP263_ENCODING_RE.search(line)
604 if result is not None:
605 encoding = result.groups()[0].decode("ascii")
606 return data.decode(encoding)
607
608 try:
609 return data.decode(DEFAULT_ENCODING)
610 except UnicodeDecodeError:
611 locale_encoding = locale.getpreferredencoding(False) or sys.getdefaultencoding()
612 logging.warning(
613 "unable to decode data from %s with default encoding %s, "
614 "falling back to encoding from locale: %s. "
615 "If this is intentional you should specify the encoding with a "
616 "PEP-263 style comment, e.g. '# -*- coding: %s -*-'",
617 url,
618 DEFAULT_ENCODING,
619 locale_encoding,
620 locale_encoding,
621 )
622 return data.decode(locale_encoding)