Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/_pytest/pathlib.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import atexit
4import contextlib
5from enum import Enum
6from errno import EBADF
7from errno import ELOOP
8from errno import ENOENT
9from errno import ENOTDIR
10import fnmatch
11from functools import partial
12from importlib.machinery import ModuleSpec
13import importlib.util
14import itertools
15import os
16from os.path import expanduser
17from os.path import expandvars
18from os.path import isabs
19from os.path import sep
20from pathlib import Path
21from pathlib import PurePath
22from posixpath import sep as posix_sep
23import shutil
24import sys
25import types
26from types import ModuleType
27from typing import Any
28from typing import Callable
29from typing import Iterable
30from typing import Iterator
31from typing import TypeVar
32import uuid
33import warnings
35from _pytest.compat import assert_never
36from _pytest.outcomes import skip
37from _pytest.warning_types import PytestWarning
40LOCK_TIMEOUT = 60 * 60 * 24 * 3
43_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath)
45# The following function, variables and comments were
46# copied from cpython 3.9 Lib/pathlib.py file.
48# EBADF - guard against macOS `stat` throwing EBADF
49_IGNORED_ERRORS = (ENOENT, ENOTDIR, EBADF, ELOOP)
51_IGNORED_WINERRORS = (
52 21, # ERROR_NOT_READY - drive exists but is not accessible
53 1921, # ERROR_CANT_RESOLVE_FILENAME - fix for broken symlink pointing to itself
54)
57def _ignore_error(exception: Exception) -> bool:
58 return (
59 getattr(exception, "errno", None) in _IGNORED_ERRORS
60 or getattr(exception, "winerror", None) in _IGNORED_WINERRORS
61 )
64def get_lock_path(path: _AnyPurePath) -> _AnyPurePath:
65 return path.joinpath(".lock")
68def on_rm_rf_error(
69 func: Callable[..., Any] | None,
70 path: str,
71 excinfo: BaseException
72 | tuple[type[BaseException], BaseException, types.TracebackType | None],
73 *,
74 start_path: Path,
75) -> bool:
76 """Handle known read-only errors during rmtree.
78 The returned value is used only by our own tests.
79 """
80 if isinstance(excinfo, BaseException):
81 exc = excinfo
82 else:
83 exc = excinfo[1]
85 # Another process removed the file in the middle of the "rm_rf" (xdist for example).
86 # More context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018
87 if isinstance(exc, FileNotFoundError):
88 return False
90 if not isinstance(exc, PermissionError):
91 warnings.warn(
92 PytestWarning(f"(rm_rf) error removing {path}\n{type(exc)}: {exc}")
93 )
94 return False
96 if func not in (os.rmdir, os.remove, os.unlink):
97 if func not in (os.open,):
98 warnings.warn(
99 PytestWarning(
100 f"(rm_rf) unknown function {func} when removing {path}:\n{type(exc)}: {exc}"
101 )
102 )
103 return False
105 # Chmod + retry.
106 import stat
108 def chmod_rw(p: str) -> None:
109 mode = os.stat(p).st_mode
110 os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR)
112 # For files, we need to recursively go upwards in the directories to
113 # ensure they all are also writable.
114 p = Path(path)
115 if p.is_file():
116 for parent in p.parents:
117 chmod_rw(str(parent))
118 # Stop when we reach the original path passed to rm_rf.
119 if parent == start_path:
120 break
121 chmod_rw(str(path))
123 func(path)
124 return True
127def ensure_extended_length_path(path: Path) -> Path:
128 """Get the extended-length version of a path (Windows).
130 On Windows, by default, the maximum length of a path (MAX_PATH) is 260
131 characters, and operations on paths longer than that fail. But it is possible
132 to overcome this by converting the path to "extended-length" form before
133 performing the operation:
134 https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation
136 On Windows, this function returns the extended-length absolute version of path.
137 On other platforms it returns path unchanged.
138 """
139 if sys.platform.startswith("win32"):
140 path = path.resolve()
141 path = Path(get_extended_length_path_str(str(path)))
142 return path
145def get_extended_length_path_str(path: str) -> str:
146 """Convert a path to a Windows extended length path."""
147 long_path_prefix = "\\\\?\\"
148 unc_long_path_prefix = "\\\\?\\UNC\\"
149 if path.startswith((long_path_prefix, unc_long_path_prefix)):
150 return path
151 # UNC
152 if path.startswith("\\\\"):
153 return unc_long_path_prefix + path[2:]
154 return long_path_prefix + path
157def rm_rf(path: Path) -> None:
158 """Remove the path contents recursively, even if some elements
159 are read-only."""
160 path = ensure_extended_length_path(path)
161 onerror = partial(on_rm_rf_error, start_path=path)
162 if sys.version_info >= (3, 12):
163 shutil.rmtree(str(path), onexc=onerror)
164 else:
165 shutil.rmtree(str(path), onerror=onerror)
168def find_prefixed(root: Path, prefix: str) -> Iterator[os.DirEntry[str]]:
169 """Find all elements in root that begin with the prefix, case-insensitive."""
170 l_prefix = prefix.lower()
171 for x in os.scandir(root):
172 if x.name.lower().startswith(l_prefix):
173 yield x
176def extract_suffixes(iter: Iterable[os.DirEntry[str]], prefix: str) -> Iterator[str]:
177 """Return the parts of the paths following the prefix.
179 :param iter: Iterator over path names.
180 :param prefix: Expected prefix of the path names.
181 """
182 p_len = len(prefix)
183 for entry in iter:
184 yield entry.name[p_len:]
187def find_suffixes(root: Path, prefix: str) -> Iterator[str]:
188 """Combine find_prefixes and extract_suffixes."""
189 return extract_suffixes(find_prefixed(root, prefix), prefix)
192def parse_num(maybe_num: str) -> int:
193 """Parse number path suffixes, returns -1 on error."""
194 try:
195 return int(maybe_num)
196 except ValueError:
197 return -1
200def _force_symlink(root: Path, target: str | PurePath, link_to: str | Path) -> None:
201 """Helper to create the current symlink.
203 It's full of race conditions that are reasonably OK to ignore
204 for the context of best effort linking to the latest test run.
206 The presumption being that in case of much parallelism
207 the inaccuracy is going to be acceptable.
208 """
209 current_symlink = root.joinpath(target)
210 try:
211 current_symlink.unlink()
212 except OSError:
213 pass
214 try:
215 current_symlink.symlink_to(link_to)
216 except Exception:
217 pass
220def make_numbered_dir(root: Path, prefix: str, mode: int = 0o700) -> Path:
221 """Create a directory with an increased number as suffix for the given prefix."""
222 for i in range(10):
223 # try up to 10 times to create the folder
224 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
225 new_number = max_existing + 1
226 new_path = root.joinpath(f"{prefix}{new_number}")
227 try:
228 new_path.mkdir(mode=mode)
229 except Exception:
230 pass
231 else:
232 _force_symlink(root, prefix + "current", new_path)
233 return new_path
234 else:
235 raise OSError(
236 "could not create numbered dir with prefix "
237 f"{prefix} in {root} after 10 tries"
238 )
241def create_cleanup_lock(p: Path) -> Path:
242 """Create a lock to prevent premature folder cleanup."""
243 lock_path = get_lock_path(p)
244 try:
245 fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
246 except FileExistsError as e:
247 raise OSError(f"cannot create lockfile in {p}") from e
248 else:
249 pid = os.getpid()
250 spid = str(pid).encode()
251 os.write(fd, spid)
252 os.close(fd)
253 if not lock_path.is_file():
254 raise OSError("lock path got renamed after successful creation")
255 return lock_path
258def register_cleanup_lock_removal(
259 lock_path: Path, register: Any = atexit.register
260) -> Any:
261 """Register a cleanup function for removing a lock, by default on atexit."""
262 pid = os.getpid()
264 def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None:
265 current_pid = os.getpid()
266 if current_pid != original_pid:
267 # fork
268 return
269 try:
270 lock_path.unlink()
271 except OSError:
272 pass
274 return register(cleanup_on_exit)
277def maybe_delete_a_numbered_dir(path: Path) -> None:
278 """Remove a numbered directory if its lock can be obtained and it does
279 not seem to be in use."""
280 path = ensure_extended_length_path(path)
281 lock_path = None
282 try:
283 lock_path = create_cleanup_lock(path)
284 parent = path.parent
286 garbage = parent.joinpath(f"garbage-{uuid.uuid4()}")
287 path.rename(garbage)
288 rm_rf(garbage)
289 except OSError:
290 # known races:
291 # * other process did a cleanup at the same time
292 # * deletable folder was found
293 # * process cwd (Windows)
294 return
295 finally:
296 # If we created the lock, ensure we remove it even if we failed
297 # to properly remove the numbered dir.
298 if lock_path is not None:
299 try:
300 lock_path.unlink()
301 except OSError:
302 pass
305def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool:
306 """Check if `path` is deletable based on whether the lock file is expired."""
307 if path.is_symlink():
308 return False
309 lock = get_lock_path(path)
310 try:
311 if not lock.is_file():
312 return True
313 except OSError:
314 # we might not have access to the lock file at all, in this case assume
315 # we don't have access to the entire directory (#7491).
316 return False
317 try:
318 lock_time = lock.stat().st_mtime
319 except Exception:
320 return False
321 else:
322 if lock_time < consider_lock_dead_if_created_before:
323 # We want to ignore any errors while trying to remove the lock such as:
324 # - PermissionDenied, like the file permissions have changed since the lock creation;
325 # - FileNotFoundError, in case another pytest process got here first;
326 # and any other cause of failure.
327 with contextlib.suppress(OSError):
328 lock.unlink()
329 return True
330 return False
333def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None:
334 """Try to cleanup a folder if we can ensure it's deletable."""
335 if ensure_deletable(path, consider_lock_dead_if_created_before):
336 maybe_delete_a_numbered_dir(path)
339def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]:
340 """List candidates for numbered directories to be removed - follows py.path."""
341 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
342 max_delete = max_existing - keep
343 entries = find_prefixed(root, prefix)
344 entries, entries2 = itertools.tee(entries)
345 numbers = map(parse_num, extract_suffixes(entries2, prefix))
346 for entry, number in zip(entries, numbers):
347 if number <= max_delete:
348 yield Path(entry)
351def cleanup_dead_symlinks(root: Path) -> None:
352 for left_dir in root.iterdir():
353 if left_dir.is_symlink():
354 if not left_dir.resolve().exists():
355 left_dir.unlink()
358def cleanup_numbered_dir(
359 root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float
360) -> None:
361 """Cleanup for lock driven numbered directories."""
362 if not root.exists():
363 return
364 for path in cleanup_candidates(root, prefix, keep):
365 try_cleanup(path, consider_lock_dead_if_created_before)
366 for path in root.glob("garbage-*"):
367 try_cleanup(path, consider_lock_dead_if_created_before)
369 cleanup_dead_symlinks(root)
372def make_numbered_dir_with_cleanup(
373 root: Path,
374 prefix: str,
375 keep: int,
376 lock_timeout: float,
377 mode: int,
378) -> Path:
379 """Create a numbered dir with a cleanup lock and remove old ones."""
380 e = None
381 for i in range(10):
382 try:
383 p = make_numbered_dir(root, prefix, mode)
384 # Only lock the current dir when keep is not 0
385 if keep != 0:
386 lock_path = create_cleanup_lock(p)
387 register_cleanup_lock_removal(lock_path)
388 except Exception as exc:
389 e = exc
390 else:
391 consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout
392 # Register a cleanup for program exit
393 atexit.register(
394 cleanup_numbered_dir,
395 root,
396 prefix,
397 keep,
398 consider_lock_dead_if_created_before,
399 )
400 return p
401 assert e is not None
402 raise e
405def resolve_from_str(input: str, rootpath: Path) -> Path:
406 input = expanduser(input)
407 input = expandvars(input)
408 if isabs(input):
409 return Path(input)
410 else:
411 return rootpath.joinpath(input)
414def fnmatch_ex(pattern: str, path: str | os.PathLike[str]) -> bool:
415 """A port of FNMatcher from py.path.common which works with PurePath() instances.
417 The difference between this algorithm and PurePath.match() is that the
418 latter matches "**" glob expressions for each part of the path, while
419 this algorithm uses the whole path instead.
421 For example:
422 "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py"
423 with this algorithm, but not with PurePath.match().
425 This algorithm was ported to keep backward-compatibility with existing
426 settings which assume paths match according this logic.
428 References:
429 * https://bugs.python.org/issue29249
430 * https://bugs.python.org/issue34731
431 """
432 path = PurePath(path)
433 iswin32 = sys.platform.startswith("win")
435 if iswin32 and sep not in pattern and posix_sep in pattern:
436 # Running on Windows, the pattern has no Windows path separators,
437 # and the pattern has one or more Posix path separators. Replace
438 # the Posix path separators with the Windows path separator.
439 pattern = pattern.replace(posix_sep, sep)
441 if sep not in pattern:
442 name = path.name
443 else:
444 name = str(path)
445 if path.is_absolute() and not os.path.isabs(pattern):
446 pattern = f"*{os.sep}{pattern}"
447 return fnmatch.fnmatch(name, pattern)
450def parts(s: str) -> set[str]:
451 parts = s.split(sep)
452 return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}
455def symlink_or_skip(
456 src: os.PathLike[str] | str,
457 dst: os.PathLike[str] | str,
458 **kwargs: Any,
459) -> None:
460 """Make a symlink, or skip the test in case symlinks are not supported."""
461 try:
462 os.symlink(src, dst, **kwargs)
463 except OSError as e:
464 skip(f"symlinks not supported: {e}")
467class ImportMode(Enum):
468 """Possible values for `mode` parameter of `import_path`."""
470 prepend = "prepend"
471 append = "append"
472 importlib = "importlib"
475class ImportPathMismatchError(ImportError):
476 """Raised on import_path() if there is a mismatch of __file__'s.
478 This can happen when `import_path` is called multiple times with different filenames that has
479 the same basename but reside in packages
480 (for example "/tests1/test_foo.py" and "/tests2/test_foo.py").
481 """
484def import_path(
485 path: str | os.PathLike[str],
486 *,
487 mode: str | ImportMode = ImportMode.prepend,
488 root: Path,
489 consider_namespace_packages: bool,
490) -> ModuleType:
491 """
492 Import and return a module from the given path, which can be a file (a module) or
493 a directory (a package).
495 :param path:
496 Path to the file to import.
498 :param mode:
499 Controls the underlying import mechanism that will be used:
501 * ImportMode.prepend: the directory containing the module (or package, taking
502 `__init__.py` files into account) will be put at the *start* of `sys.path` before
503 being imported with `importlib.import_module`.
505 * ImportMode.append: same as `prepend`, but the directory will be appended
506 to the end of `sys.path`, if not already in `sys.path`.
508 * ImportMode.importlib: uses more fine control mechanisms provided by `importlib`
509 to import the module, which avoids having to muck with `sys.path` at all. It effectively
510 allows having same-named test modules in different places.
512 :param root:
513 Used as an anchor when mode == ImportMode.importlib to obtain
514 a unique name for the module being imported so it can safely be stored
515 into ``sys.modules``.
517 :param consider_namespace_packages:
518 If True, consider namespace packages when resolving module names.
520 :raises ImportPathMismatchError:
521 If after importing the given `path` and the module `__file__`
522 are different. Only raised in `prepend` and `append` modes.
523 """
524 path = Path(path)
525 mode = ImportMode(mode)
527 if not path.exists():
528 raise ImportError(path)
530 if mode is ImportMode.importlib:
531 # Try to import this module using the standard import mechanisms, but
532 # without touching sys.path.
533 try:
534 pkg_root, module_name = resolve_pkg_root_and_module_name(
535 path, consider_namespace_packages=consider_namespace_packages
536 )
537 except CouldNotResolvePathError:
538 pass
539 else:
540 # If the given module name is already in sys.modules, do not import it again.
541 with contextlib.suppress(KeyError):
542 return sys.modules[module_name]
544 mod = _import_module_using_spec(
545 module_name, path, pkg_root, insert_modules=False
546 )
547 if mod is not None:
548 return mod
550 # Could not import the module with the current sys.path, so we fall back
551 # to importing the file as a single module, not being a part of a package.
552 module_name = module_name_from_path(path, root)
553 with contextlib.suppress(KeyError):
554 return sys.modules[module_name]
556 mod = _import_module_using_spec(
557 module_name, path, path.parent, insert_modules=True
558 )
559 if mod is None:
560 raise ImportError(f"Can't find module {module_name} at location {path}")
561 return mod
563 try:
564 pkg_root, module_name = resolve_pkg_root_and_module_name(
565 path, consider_namespace_packages=consider_namespace_packages
566 )
567 except CouldNotResolvePathError:
568 pkg_root, module_name = path.parent, path.stem
570 # Change sys.path permanently: restoring it at the end of this function would cause surprising
571 # problems because of delayed imports: for example, a conftest.py file imported by this function
572 # might have local imports, which would fail at runtime if we restored sys.path.
573 if mode is ImportMode.append:
574 if str(pkg_root) not in sys.path:
575 sys.path.append(str(pkg_root))
576 elif mode is ImportMode.prepend:
577 if str(pkg_root) != sys.path[0]:
578 sys.path.insert(0, str(pkg_root))
579 else:
580 assert_never(mode)
582 importlib.import_module(module_name)
584 mod = sys.modules[module_name]
585 if path.name == "__init__.py":
586 return mod
588 ignore = os.environ.get("PY_IGNORE_IMPORTMISMATCH", "")
589 if ignore != "1":
590 module_file = mod.__file__
591 if module_file is None:
592 raise ImportPathMismatchError(module_name, module_file, path)
594 if module_file.endswith((".pyc", ".pyo")):
595 module_file = module_file[:-1]
596 if module_file.endswith(os.sep + "__init__.py"):
597 module_file = module_file[: -(len(os.sep + "__init__.py"))]
599 try:
600 is_same = _is_same(str(path), module_file)
601 except FileNotFoundError:
602 is_same = False
604 if not is_same:
605 raise ImportPathMismatchError(module_name, module_file, path)
607 return mod
610def _import_module_using_spec(
611 module_name: str, module_path: Path, module_location: Path, *, insert_modules: bool
612) -> ModuleType | None:
613 """
614 Tries to import a module by its canonical name, path to the .py file, and its
615 parent location.
617 :param insert_modules:
618 If True, will call insert_missing_modules to create empty intermediate modules
619 for made-up module names (when importing test files not reachable from sys.path).
620 """
621 # Checking with sys.meta_path first in case one of its hooks can import this module,
622 # such as our own assertion-rewrite hook.
623 for meta_importer in sys.meta_path:
624 spec = meta_importer.find_spec(module_name, [str(module_location)])
625 if spec_matches_module_path(spec, module_path):
626 break
627 else:
628 spec = importlib.util.spec_from_file_location(module_name, str(module_path))
630 if spec_matches_module_path(spec, module_path):
631 assert spec is not None
632 # Attempt to import the parent module, seems is our responsibility:
633 # https://github.com/python/cpython/blob/73906d5c908c1e0b73c5436faeff7d93698fc074/Lib/importlib/_bootstrap.py#L1308-L1311
634 parent_module_name, _, name = module_name.rpartition(".")
635 parent_module: ModuleType | None = None
636 if parent_module_name:
637 parent_module = sys.modules.get(parent_module_name)
638 if parent_module is None:
639 # Find the directory of this module's parent.
640 parent_dir = (
641 module_path.parent.parent
642 if module_path.name == "__init__.py"
643 else module_path.parent
644 )
645 # Consider the parent module path as its __init__.py file, if it has one.
646 parent_module_path = (
647 parent_dir / "__init__.py"
648 if (parent_dir / "__init__.py").is_file()
649 else parent_dir
650 )
651 parent_module = _import_module_using_spec(
652 parent_module_name,
653 parent_module_path,
654 parent_dir,
655 insert_modules=insert_modules,
656 )
658 # Find spec and import this module.
659 mod = importlib.util.module_from_spec(spec)
660 sys.modules[module_name] = mod
661 spec.loader.exec_module(mod) # type: ignore[union-attr]
663 # Set this module as an attribute of the parent module (#12194).
664 if parent_module is not None:
665 setattr(parent_module, name, mod)
667 if insert_modules:
668 insert_missing_modules(sys.modules, module_name)
669 return mod
671 return None
674def spec_matches_module_path(module_spec: ModuleSpec | None, module_path: Path) -> bool:
675 """Return true if the given ModuleSpec can be used to import the given module path."""
676 if module_spec is None or module_spec.origin is None:
677 return False
679 return Path(module_spec.origin) == module_path
682# Implement a special _is_same function on Windows which returns True if the two filenames
683# compare equal, to circumvent os.path.samefile returning False for mounts in UNC (#7678).
684if sys.platform.startswith("win"):
686 def _is_same(f1: str, f2: str) -> bool:
687 return Path(f1) == Path(f2) or os.path.samefile(f1, f2)
689else:
691 def _is_same(f1: str, f2: str) -> bool:
692 return os.path.samefile(f1, f2)
695def module_name_from_path(path: Path, root: Path) -> str:
696 """
697 Return a dotted module name based on the given path, anchored on root.
699 For example: path="projects/src/tests/test_foo.py" and root="/projects", the
700 resulting module name will be "src.tests.test_foo".
701 """
702 path = path.with_suffix("")
703 try:
704 relative_path = path.relative_to(root)
705 except ValueError:
706 # If we can't get a relative path to root, use the full path, except
707 # for the first part ("d:\\" or "/" depending on the platform, for example).
708 path_parts = path.parts[1:]
709 else:
710 # Use the parts for the relative path to the root path.
711 path_parts = relative_path.parts
713 # Module name for packages do not contain the __init__ file, unless
714 # the `__init__.py` file is at the root.
715 if len(path_parts) >= 2 and path_parts[-1] == "__init__":
716 path_parts = path_parts[:-1]
718 # Module names cannot contain ".", normalize them to "_". This prevents
719 # a directory having a "." in the name (".env.310" for example) causing extra intermediate modules.
720 # Also, important to replace "." at the start of paths, as those are considered relative imports.
721 path_parts = tuple(x.replace(".", "_") for x in path_parts)
723 return ".".join(path_parts)
726def insert_missing_modules(modules: dict[str, ModuleType], module_name: str) -> None:
727 """
728 Used by ``import_path`` to create intermediate modules when using mode=importlib.
730 When we want to import a module as "src.tests.test_foo" for example, we need
731 to create empty modules "src" and "src.tests" after inserting "src.tests.test_foo",
732 otherwise "src.tests.test_foo" is not importable by ``__import__``.
733 """
734 module_parts = module_name.split(".")
735 while module_name:
736 parent_module_name, _, child_name = module_name.rpartition(".")
737 if parent_module_name:
738 parent_module = modules.get(parent_module_name)
739 if parent_module is None:
740 try:
741 # If sys.meta_path is empty, calling import_module will issue
742 # a warning and raise ModuleNotFoundError. To avoid the
743 # warning, we check sys.meta_path explicitly and raise the error
744 # ourselves to fall back to creating a dummy module.
745 if not sys.meta_path:
746 raise ModuleNotFoundError
747 parent_module = importlib.import_module(parent_module_name)
748 except ModuleNotFoundError:
749 parent_module = ModuleType(
750 module_name,
751 doc="Empty module created by pytest's importmode=importlib.",
752 )
753 modules[parent_module_name] = parent_module
755 # Add child attribute to the parent that can reference the child
756 # modules.
757 if not hasattr(parent_module, child_name):
758 setattr(parent_module, child_name, modules[module_name])
760 module_parts.pop(-1)
761 module_name = ".".join(module_parts)
764def resolve_package_path(path: Path) -> Path | None:
765 """Return the Python package path by looking for the last
766 directory upwards which still contains an __init__.py.
768 Returns None if it cannot be determined.
769 """
770 result = None
771 for parent in itertools.chain((path,), path.parents):
772 if parent.is_dir():
773 if not (parent / "__init__.py").is_file():
774 break
775 if not parent.name.isidentifier():
776 break
777 result = parent
778 return result
781def resolve_pkg_root_and_module_name(
782 path: Path, *, consider_namespace_packages: bool = False
783) -> tuple[Path, str]:
784 """
785 Return the path to the directory of the root package that contains the
786 given Python file, and its module name:
788 src/
789 app/
790 __init__.py
791 core/
792 __init__.py
793 models.py
795 Passing the full path to `models.py` will yield Path("src") and "app.core.models".
797 If consider_namespace_packages is True, then we additionally check upwards in the hierarchy
798 for namespace packages:
800 https://packaging.python.org/en/latest/guides/packaging-namespace-packages
802 Raises CouldNotResolvePathError if the given path does not belong to a package (missing any __init__.py files).
803 """
804 pkg_root: Path | None = None
805 pkg_path = resolve_package_path(path)
806 if pkg_path is not None:
807 pkg_root = pkg_path.parent
808 if consider_namespace_packages:
809 start = pkg_root if pkg_root is not None else path.parent
810 for candidate in (start, *start.parents):
811 module_name = compute_module_name(candidate, path)
812 if module_name and is_importable(module_name, path):
813 # Point the pkg_root to the root of the namespace package.
814 pkg_root = candidate
815 break
817 if pkg_root is not None:
818 module_name = compute_module_name(pkg_root, path)
819 if module_name:
820 return pkg_root, module_name
822 raise CouldNotResolvePathError(f"Could not resolve for {path}")
825def is_importable(module_name: str, module_path: Path) -> bool:
826 """
827 Return if the given module path could be imported normally by Python, akin to the user
828 entering the REPL and importing the corresponding module name directly, and corresponds
829 to the module_path specified.
831 :param module_name:
832 Full module name that we want to check if is importable.
833 For example, "app.models".
835 :param module_path:
836 Full path to the python module/package we want to check if is importable.
837 For example, "/projects/src/app/models.py".
838 """
839 try:
840 # Note this is different from what we do in ``_import_module_using_spec``, where we explicitly search through
841 # sys.meta_path to be able to pass the path of the module that we want to import (``meta_importer.find_spec``).
842 # Using importlib.util.find_spec() is different, it gives the same results as trying to import
843 # the module normally in the REPL.
844 spec = importlib.util.find_spec(module_name)
845 except (ImportError, ValueError, ImportWarning):
846 return False
847 else:
848 return spec_matches_module_path(spec, module_path)
851def compute_module_name(root: Path, module_path: Path) -> str | None:
852 """Compute a module name based on a path and a root anchor."""
853 try:
854 path_without_suffix = module_path.with_suffix("")
855 except ValueError:
856 # Empty paths (such as Path.cwd()) might break meta_path hooks (like our own assertion rewriter).
857 return None
859 try:
860 relative = path_without_suffix.relative_to(root)
861 except ValueError: # pragma: no cover
862 return None
863 names = list(relative.parts)
864 if not names:
865 return None
866 if names[-1] == "__init__":
867 names.pop()
868 return ".".join(names)
871class CouldNotResolvePathError(Exception):
872 """Custom exception raised by resolve_pkg_root_and_module_name."""
875def scandir(
876 path: str | os.PathLike[str],
877 sort_key: Callable[[os.DirEntry[str]], object] = lambda entry: entry.name,
878) -> list[os.DirEntry[str]]:
879 """Scan a directory recursively, in breadth-first order.
881 The returned entries are sorted according to the given key.
882 The default is to sort by name.
883 """
884 entries = []
885 with os.scandir(path) as s:
886 # Skip entries with symlink loops and other brokenness, so the caller
887 # doesn't have to deal with it.
888 for entry in s:
889 try:
890 entry.is_file()
891 except OSError as err:
892 if _ignore_error(err):
893 continue
894 raise
895 entries.append(entry)
896 entries.sort(key=sort_key) # type: ignore[arg-type]
897 return entries
900def visit(
901 path: str | os.PathLike[str], recurse: Callable[[os.DirEntry[str]], bool]
902) -> Iterator[os.DirEntry[str]]:
903 """Walk a directory recursively, in breadth-first order.
905 The `recurse` predicate determines whether a directory is recursed.
907 Entries at each directory level are sorted.
908 """
909 entries = scandir(path)
910 yield from entries
911 for entry in entries:
912 if entry.is_dir() and recurse(entry):
913 yield from visit(entry.path, recurse)
916def absolutepath(path: str | os.PathLike[str]) -> Path:
917 """Convert a path to an absolute path using os.path.abspath.
919 Prefer this over Path.resolve() (see #6523).
920 Prefer this over Path.absolute() (not public, doesn't normalize).
921 """
922 return Path(os.path.abspath(path))
925def commonpath(path1: Path, path2: Path) -> Path | None:
926 """Return the common part shared with the other path, or None if there is
927 no common part.
929 If one path is relative and one is absolute, returns None.
930 """
931 try:
932 return Path(os.path.commonpath((str(path1), str(path2))))
933 except ValueError:
934 return None
937def bestrelpath(directory: Path, dest: Path) -> str:
938 """Return a string which is a relative path from directory to dest such
939 that directory/bestrelpath == dest.
941 The paths must be either both absolute or both relative.
943 If no such path can be determined, returns dest.
944 """
945 assert isinstance(directory, Path)
946 assert isinstance(dest, Path)
947 if dest == directory:
948 return os.curdir
949 # Find the longest common directory.
950 base = commonpath(directory, dest)
951 # Can be the case on Windows for two absolute paths on different drives.
952 # Can be the case for two relative paths without common prefix.
953 # Can be the case for a relative path and an absolute path.
954 if not base:
955 return str(dest)
956 reldirectory = directory.relative_to(base)
957 reldest = dest.relative_to(base)
958 return os.path.join(
959 # Back from directory to base.
960 *([os.pardir] * len(reldirectory.parts)),
961 # Forward from base to dest.
962 *reldest.parts,
963 )
966def safe_exists(p: Path) -> bool:
967 """Like Path.exists(), but account for input arguments that might be too long (#11394)."""
968 try:
969 return p.exists()
970 except (ValueError, OSError):
971 # ValueError: stat: path too long for Windows
972 # OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect
973 return False