1"""
2Requirements file parsing
3"""
4
5from __future__ import annotations
6
7import codecs
8import locale
9import logging
10import optparse
11import os
12import re
13import shlex
14import sys
15import urllib.parse
16from collections.abc import Callable, Generator, Iterable
17from dataclasses import dataclass
18from optparse import Values
19from typing import (
20 TYPE_CHECKING,
21 Any,
22 NoReturn,
23)
24
25from pip._internal.cli import cmdoptions
26from pip._internal.exceptions import InstallationError, RequirementsFileParseError
27from pip._internal.models.release_control import ReleaseControl
28from pip._internal.models.search_scope import SearchScope
29
30if TYPE_CHECKING:
31 from pip._internal.index.package_finder import PackageFinder
32 from pip._internal.network.session import PipSession
33
34__all__ = ["parse_requirements"]
35
36ReqFileLines = Iterable[tuple[int, str]]
37
38LineParser = Callable[[str], tuple[str, Values]]
39
40SCHEME_RE = re.compile(r"^(http|https|file):", re.I)
41COMMENT_RE = re.compile(r"(^|\s+)#.*$")
42
43# Matches environment variable-style values in '${MY_VARIABLE_1}' with the
44# variable name consisting of only uppercase letters, digits or the '_'
45# (underscore). This follows the POSIX standard defined in IEEE Std 1003.1,
46# 2013 Edition.
47ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})")
48
49SUPPORTED_OPTIONS: list[Callable[..., optparse.Option]] = [
50 cmdoptions.index_url,
51 cmdoptions.extra_index_url,
52 cmdoptions.no_index,
53 cmdoptions.constraints,
54 cmdoptions.requirements,
55 cmdoptions.editable,
56 cmdoptions.find_links,
57 cmdoptions.no_binary,
58 cmdoptions.only_binary,
59 cmdoptions.prefer_binary,
60 cmdoptions.require_hashes,
61 cmdoptions.pre,
62 cmdoptions.all_releases,
63 cmdoptions.only_final,
64 cmdoptions.trusted_host,
65 cmdoptions.use_new_feature,
66]
67
68# options to be passed to requirements
69SUPPORTED_OPTIONS_REQ: list[Callable[..., optparse.Option]] = [
70 cmdoptions.hash,
71 cmdoptions.config_settings,
72]
73
74SUPPORTED_OPTIONS_EDITABLE_REQ: list[Callable[..., optparse.Option]] = [
75 cmdoptions.config_settings,
76]
77
78
79# the 'dest' string values
80SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ]
81SUPPORTED_OPTIONS_EDITABLE_REQ_DEST = [
82 str(o().dest) for o in SUPPORTED_OPTIONS_EDITABLE_REQ
83]
84
85# order of BOMS is important: codecs.BOM_UTF16_LE is a prefix of codecs.BOM_UTF32_LE
86# so data.startswith(BOM_UTF16_LE) would be true for UTF32_LE data
87BOMS: list[tuple[bytes, str]] = [
88 (codecs.BOM_UTF8, "utf-8"),
89 (codecs.BOM_UTF32, "utf-32"),
90 (codecs.BOM_UTF32_BE, "utf-32-be"),
91 (codecs.BOM_UTF32_LE, "utf-32-le"),
92 (codecs.BOM_UTF16, "utf-16"),
93 (codecs.BOM_UTF16_BE, "utf-16-be"),
94 (codecs.BOM_UTF16_LE, "utf-16-le"),
95]
96
97PEP263_ENCODING_RE = re.compile(rb"coding[:=]\s*([-\w.]+)")
98DEFAULT_ENCODING = "utf-8"
99
100logger = logging.getLogger(__name__)
101
102
103@dataclass(frozen=True, slots=True)
104class ParsedRequirement:
105 requirement: str
106 is_editable: bool
107 comes_from: str
108 constraint: bool
109 options: dict[str, Any] | None
110 line_source: str | None
111
112
113@dataclass(frozen=True, slots=True)
114class ParsedLine:
115 filename: str
116 lineno: int
117 args: str
118 opts: Values
119 constraint: bool
120
121 @property
122 def is_editable(self) -> bool:
123 return bool(self.opts.editables)
124
125 @property
126 def requirement(self) -> str | None:
127 if self.args:
128 return self.args
129 elif self.is_editable:
130 # We don't support multiple -e on one line
131 return self.opts.editables[0]
132 return None
133
134
135def parse_requirements(
136 filename: str,
137 session: PipSession,
138 finder: PackageFinder | None = None,
139 options: optparse.Values | None = None,
140 constraint: bool = False,
141) -> Generator[ParsedRequirement, None, None]:
142 """Parse a requirements file and yield ParsedRequirement instances.
143
144 :param filename: Path or url of requirements file.
145 :param session: PipSession instance.
146 :param finder: Instance of pip.index.PackageFinder.
147 :param options: cli options.
148 :param constraint: If true, parsing a constraint file rather than
149 requirements file.
150 """
151 line_parser = get_line_parser(finder)
152 parser = RequirementsFileParser(session, line_parser)
153
154 for parsed_line in parser.parse(filename, constraint):
155 parsed_req = handle_line(
156 parsed_line, options=options, finder=finder, session=session
157 )
158 if parsed_req is not None:
159 yield parsed_req
160
161
162def preprocess(content: str) -> ReqFileLines:
163 """Split, filter, and join lines, and return a line iterator
164
165 :param content: the content of the requirements file
166 """
167 lines_enum: ReqFileLines = enumerate(content.splitlines(), start=1)
168 lines_enum = join_lines(lines_enum)
169 lines_enum = ignore_comments(lines_enum)
170 lines_enum = expand_env_variables(lines_enum)
171 return lines_enum
172
173
174def handle_requirement_line(
175 line: ParsedLine,
176 options: optparse.Values | None = None,
177) -> ParsedRequirement:
178 # preserve for the nested code path
179 line_comes_from = "{} {} (line {})".format(
180 "-c" if line.constraint else "-r",
181 line.filename,
182 line.lineno,
183 )
184
185 assert line.requirement is not None
186
187 # get the options that apply to requirements
188 if line.is_editable:
189 supported_dest = SUPPORTED_OPTIONS_EDITABLE_REQ_DEST
190 else:
191 supported_dest = SUPPORTED_OPTIONS_REQ_DEST
192 req_options = {}
193 for dest in supported_dest:
194 if dest in line.opts.__dict__ and line.opts.__dict__[dest]:
195 req_options[dest] = line.opts.__dict__[dest]
196
197 line_source = f"line {line.lineno} of {line.filename}"
198 return ParsedRequirement(
199 requirement=line.requirement,
200 is_editable=line.is_editable,
201 comes_from=line_comes_from,
202 constraint=line.constraint,
203 options=req_options,
204 line_source=line_source,
205 )
206
207
208def handle_option_line(
209 opts: Values,
210 filename: str,
211 lineno: int,
212 finder: PackageFinder | None = None,
213 options: optparse.Values | None = None,
214 session: PipSession | None = None,
215) -> None:
216 if opts.hashes:
217 logger.warning(
218 "%s line %s has --hash but no requirement, and will be ignored.",
219 filename,
220 lineno,
221 )
222
223 if options:
224 # percolate options upward
225 if opts.require_hashes:
226 options.require_hashes = opts.require_hashes
227 if opts.features_enabled:
228 options.features_enabled.extend(
229 f for f in opts.features_enabled if f not in options.features_enabled
230 )
231
232 # set finder options
233 if finder:
234 find_links = finder.find_links
235 index_urls = finder.index_urls
236 no_index = finder.search_scope.no_index
237 if opts.no_index is True:
238 no_index = True
239 index_urls = []
240 if opts.index_url and not no_index:
241 index_urls = [opts.index_url]
242 if opts.extra_index_urls and not no_index:
243 index_urls.extend(opts.extra_index_urls)
244 if opts.find_links:
245 # FIXME: it would be nice to keep track of the source
246 # of the find_links: support a find-links local path
247 # relative to a requirements file.
248 value = opts.find_links[0]
249 req_dir = os.path.dirname(os.path.abspath(filename))
250 relative_to_reqs_file = os.path.join(req_dir, value)
251 if os.path.exists(relative_to_reqs_file):
252 value = relative_to_reqs_file
253 find_links.append(value)
254
255 if session:
256 # We need to update the auth urls in session
257 session.update_index_urls(index_urls)
258
259 search_scope = SearchScope(
260 find_links=find_links,
261 index_urls=index_urls,
262 no_index=no_index,
263 )
264 finder.search_scope = search_scope
265
266 # Transform --pre into --all-releases :all:
267 if opts.pre:
268 if not opts.release_control:
269 opts.release_control = ReleaseControl()
270 opts.release_control.all_releases.add(":all:")
271
272 if opts.release_control:
273 if not finder.release_control:
274 # First time seeing release_control, set it on finder
275 finder.set_release_control(opts.release_control)
276
277 if opts.prefer_binary:
278 finder.set_prefer_binary()
279
280 if session:
281 for host in opts.trusted_hosts or []:
282 source = f"line {lineno} of {filename}"
283 session.add_trusted_host(host, source=source)
284
285
286def handle_line(
287 line: ParsedLine,
288 options: optparse.Values | None = None,
289 finder: PackageFinder | None = None,
290 session: PipSession | None = None,
291) -> ParsedRequirement | None:
292 """Handle a single parsed requirements line; This can result in
293 creating/yielding requirements, or updating the finder.
294
295 :param line: The parsed line to be processed.
296 :param options: CLI options.
297 :param finder: The finder - updated by non-requirement lines.
298 :param session: The session - updated by non-requirement lines.
299
300 Returns a ParsedRequirement object if the line is a requirement line,
301 otherwise returns None.
302
303 For lines that contain requirements, the only options that have an effect
304 are from SUPPORTED_OPTIONS_REQ, and they are scoped to the
305 requirement. Other options from SUPPORTED_OPTIONS may be present, but are
306 ignored.
307
308 For lines that do not contain requirements, the only options that have an
309 effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may
310 be present, but are ignored. These lines may contain multiple options
311 (although our docs imply only one is supported), and all our parsed and
312 affect the finder.
313 """
314
315 if line.requirement is not None:
316 parsed_req = handle_requirement_line(line, options)
317 return parsed_req
318 else:
319 handle_option_line(
320 line.opts,
321 line.filename,
322 line.lineno,
323 finder,
324 options,
325 session,
326 )
327 return None
328
329
330class RequirementsFileParser:
331 def __init__(
332 self,
333 session: PipSession,
334 line_parser: LineParser,
335 ) -> None:
336 self._session = session
337 self._line_parser = line_parser
338
339 def parse(
340 self, filename: str, constraint: bool
341 ) -> Generator[ParsedLine, None, None]:
342 """Parse a given file, yielding parsed lines."""
343 yield from self._parse_and_recurse(
344 filename, constraint, [{os.path.abspath(filename): None}]
345 )
346
347 def _parse_and_recurse(
348 self,
349 filename: str,
350 constraint: bool,
351 parsed_files_stack: list[dict[str, str | None]],
352 ) -> Generator[ParsedLine, None, None]:
353 for line in self._parse_file(filename, constraint):
354 if line.requirement is None and (
355 line.opts.requirements or line.opts.constraints
356 ):
357 # parse a nested requirements file
358 if line.opts.requirements:
359 req_path = line.opts.requirements[0]
360 nested_constraint = False
361 else:
362 req_path = line.opts.constraints[0]
363 nested_constraint = True
364
365 # original file is over http
366 if SCHEME_RE.search(filename):
367 # do a url join so relative paths work
368 req_path = urllib.parse.urljoin(filename, req_path)
369 # original file and nested file are paths
370 elif not SCHEME_RE.search(req_path):
371 # do a join so relative paths work
372 # and then abspath so that we can identify recursive references
373 req_path = os.path.abspath(
374 os.path.join(
375 os.path.dirname(filename),
376 req_path,
377 )
378 )
379 parsed_files = parsed_files_stack[0]
380 if req_path in parsed_files:
381 initial_file = parsed_files[req_path]
382 tail = (
383 f" and again in {initial_file}"
384 if initial_file is not None
385 else ""
386 )
387 raise RequirementsFileParseError(
388 f"{req_path} recursively references itself in {filename}{tail}"
389 )
390 # Keeping a track where was each file first included in
391 new_parsed_files = parsed_files.copy()
392 new_parsed_files[req_path] = filename
393 yield from self._parse_and_recurse(
394 req_path, nested_constraint, [new_parsed_files, *parsed_files_stack]
395 )
396 else:
397 yield line
398
399 def _parse_file(
400 self, filename: str, constraint: bool
401 ) -> Generator[ParsedLine, None, None]:
402 _, content = get_file_content(filename, self._session, constraint=constraint)
403
404 lines_enum = preprocess(content)
405
406 for line_number, line in lines_enum:
407 try:
408 args_str, opts = self._line_parser(line)
409 except OptionParsingError as e:
410 # add offending line
411 msg = f"Invalid requirement: {line}\n{e.msg}"
412 raise RequirementsFileParseError(msg)
413
414 yield ParsedLine(
415 filename,
416 line_number,
417 args_str,
418 opts,
419 constraint,
420 )
421
422
423def get_line_parser(finder: PackageFinder | None) -> LineParser:
424 def parse_line(line: str) -> tuple[str, Values]:
425 # Build new parser for each line since it accumulates appendable
426 # options.
427 parser = build_parser()
428 defaults = parser.get_default_values()
429 defaults.index_url = None
430 if finder:
431 defaults.format_control = finder.format_control
432 defaults.release_control = finder.release_control
433
434 args_str, options_str = break_args_options(line)
435
436 try:
437 options = shlex.split(options_str)
438 except ValueError as e:
439 raise OptionParsingError(f"Could not split options: {options_str}") from e
440
441 opts, _ = parser.parse_args(options, defaults)
442
443 return args_str, opts
444
445 return parse_line
446
447
448def break_args_options(line: str) -> tuple[str, str]:
449 """Break up the line into an args and options string. We only want to shlex
450 (and then optparse) the options, not the args. args can contain markers
451 which are corrupted by shlex.
452 """
453 tokens = line.split(" ")
454 args = []
455 options = tokens[:]
456 for token in tokens:
457 if token.startswith(("-", "--")):
458 break
459 else:
460 args.append(token)
461 options.pop(0)
462 return " ".join(args), " ".join(options)
463
464
465class OptionParsingError(Exception):
466 def __init__(self, msg: str) -> None:
467 self.msg = msg
468
469
470def build_parser() -> optparse.OptionParser:
471 """
472 Return a parser for parsing requirement lines
473 """
474 parser = optparse.OptionParser(add_help_option=False)
475
476 option_factories = SUPPORTED_OPTIONS + SUPPORTED_OPTIONS_REQ
477 for option_factory in option_factories:
478 option = option_factory()
479 parser.add_option(option)
480
481 # By default optparse sys.exits on parsing errors. We want to wrap
482 # that in our own exception.
483 def parser_exit(self: Any, msg: str) -> NoReturn:
484 raise OptionParsingError(msg)
485
486 # NOTE: mypy disallows assigning to a method
487 # https://github.com/python/mypy/issues/2427
488 parser.exit = parser_exit # type: ignore
489
490 return parser
491
492
493def join_lines(lines_enum: ReqFileLines) -> ReqFileLines:
494 """Joins a line ending in '\' with the previous line (except when following
495 comments). The joined line takes on the index of the first line.
496 """
497 primary_line_number = None
498 new_line: list[str] = []
499 for line_number, line in lines_enum:
500 if not line.endswith("\\") or COMMENT_RE.match(line):
501 if COMMENT_RE.match(line):
502 # this ensures comments are always matched later
503 line = " " + line
504 if new_line:
505 new_line.append(line)
506 assert primary_line_number is not None
507 yield primary_line_number, "".join(new_line)
508 new_line = []
509 else:
510 yield line_number, line
511 else:
512 if not new_line:
513 primary_line_number = line_number
514 new_line.append(line.strip("\\"))
515
516 # last line contains \
517 if new_line:
518 assert primary_line_number is not None
519 yield primary_line_number, "".join(new_line)
520
521 # TODO: handle space after '\'.
522
523
524def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines:
525 """
526 Strips comments and filter empty lines.
527 """
528 for line_number, line in lines_enum:
529 line = COMMENT_RE.sub("", line)
530 line = line.strip()
531 if line:
532 yield line_number, line
533
534
535def expand_env_variables(lines_enum: ReqFileLines) -> ReqFileLines:
536 """Replace all environment variables that can be retrieved via `os.getenv`.
537
538 The only allowed format for environment variables defined in the
539 requirement file is `${MY_VARIABLE_1}` to ensure two things:
540
541 1. Strings that contain a `$` aren't accidentally (partially) expanded.
542 2. Ensure consistency across platforms for requirement files.
543
544 These points are the result of a discussion on the `github pull
545 request #3514 <https://github.com/pypa/pip/pull/3514>`_.
546
547 Valid characters in variable names follow the `POSIX standard
548 <http://pubs.opengroup.org/onlinepubs/9699919799/>`_ and are limited
549 to uppercase letter, digits and the `_` (underscore).
550 """
551 for line_number, line in lines_enum:
552 for env_var, var_name in ENV_VAR_RE.findall(line):
553 value = os.getenv(var_name)
554 if not value:
555 continue
556
557 line = line.replace(env_var, value)
558
559 yield line_number, line
560
561
562def get_file_content(
563 url: str, session: PipSession, *, constraint: bool = False
564) -> tuple[str, str]:
565 """Gets the content of a file; it may be a filename, file: URL, or
566 http: URL. Returns (location, content). Content is unicode.
567 Respects # -*- coding: declarations on the retrieved files.
568
569 :param url: File path or url.
570 :param session: PipSession instance.
571 """
572 scheme = urllib.parse.urlsplit(url).scheme
573 # Pip has special support for file:// URLs (LocalFSAdapter).
574 if scheme in ["http", "https", "file"]:
575 # Delay importing heavy network modules until absolutely necessary.
576 from pip._internal.network.utils import raise_for_status
577
578 resp = session.get(url)
579 raise_for_status(resp)
580 return resp.url, resp.text
581
582 # Assume this is a bare path.
583 try:
584 with open(url, "rb") as f:
585 raw_content = f.read()
586 except OSError as exc:
587 kind = "constraint" if constraint else "requirements"
588 raise InstallationError(f"Could not open {kind} file: {exc}")
589
590 content = _decode_req_file(raw_content, url)
591
592 return url, content
593
594
595def _decode_req_file(data: bytes, url: str) -> str:
596 for bom, encoding in BOMS:
597 if data.startswith(bom):
598 return data[len(bom) :].decode(encoding)
599
600 for line in data.split(b"\n")[:2]:
601 if line[0:1] == b"#":
602 result = PEP263_ENCODING_RE.search(line)
603 if result is not None:
604 encoding = result.groups()[0].decode("ascii")
605 return data.decode(encoding)
606
607 try:
608 return data.decode(DEFAULT_ENCODING)
609 except UnicodeDecodeError:
610 locale_encoding = locale.getpreferredencoding(False) or sys.getdefaultencoding()
611 logging.warning(
612 "unable to decode data from %s with default encoding %s, "
613 "falling back to encoding from locale: %s. "
614 "If this is intentional you should specify the encoding with a "
615 "PEP-263 style comment, e.g. '# -*- coding: %s -*-'",
616 url,
617 DEFAULT_ENCODING,
618 locale_encoding,
619 locale_encoding,
620 )
621 return data.decode(locale_encoding)