Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/_pytest/pathlib.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3from collections.abc import Callable
4from collections.abc import Iterable
5from collections.abc import Iterator
6import contextlib
7from enum import Enum
8from errno import EBADF
9from errno import ELOOP
10from errno import ENOENT
11from errno import ENOTDIR
12import fnmatch
13from functools import partial
14from importlib.machinery import ModuleSpec
15from importlib.machinery import PathFinder
16import importlib.util
17import itertools
18import os
19from os.path import expanduser
20from os.path import expandvars
21from os.path import isabs
22from os.path import sep
23from pathlib import Path
24from pathlib import PurePath
25from posixpath import sep as posix_sep
26import shutil
27import sys
28import types
29from types import ModuleType
30from typing import Any
31from typing import TypeVar
32import uuid
33import warnings
35from _pytest.compat import assert_never
36from _pytest.outcomes import skip
37from _pytest.warning_types import PytestWarning
40if sys.version_info < (3, 11):
41 from importlib._bootstrap_external import _NamespaceLoader as NamespaceLoader
42else:
43 from importlib.machinery import NamespaceLoader
45LOCK_TIMEOUT = 60 * 60 * 24 * 3
47_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath)
49# The following function, variables and comments were
50# copied from cpython 3.9 Lib/pathlib.py file.
52# EBADF - guard against macOS `stat` throwing EBADF
53_IGNORED_ERRORS = (ENOENT, ENOTDIR, EBADF, ELOOP)
55_IGNORED_WINERRORS = (
56 21, # ERROR_NOT_READY - drive exists but is not accessible
57 1921, # ERROR_CANT_RESOLVE_FILENAME - fix for broken symlink pointing to itself
58)
61def _ignore_error(exception: Exception) -> bool:
62 return (
63 getattr(exception, "errno", None) in _IGNORED_ERRORS
64 or getattr(exception, "winerror", None) in _IGNORED_WINERRORS
65 )
68def get_lock_path(path: _AnyPurePath) -> _AnyPurePath:
69 return path.joinpath(".lock")
72def on_rm_rf_error(
73 func: Callable[..., Any] | None,
74 path: str,
75 excinfo: BaseException
76 | tuple[type[BaseException], BaseException, types.TracebackType | None],
77 *,
78 start_path: Path,
79) -> bool:
80 """Handle known read-only errors during rmtree.
82 The returned value is used only by our own tests.
83 """
84 if isinstance(excinfo, BaseException):
85 exc = excinfo
86 else:
87 exc = excinfo[1]
89 # Another process removed the file in the middle of the "rm_rf" (xdist for example).
90 # More context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018
91 if isinstance(exc, FileNotFoundError):
92 return False
94 if not isinstance(exc, PermissionError):
95 warnings.warn(
96 PytestWarning(f"(rm_rf) error removing {path}\n{type(exc)}: {exc}")
97 )
98 return False
100 if func not in (os.rmdir, os.remove, os.unlink):
101 if func not in (os.open,):
102 warnings.warn(
103 PytestWarning(
104 f"(rm_rf) unknown function {func} when removing {path}:\n{type(exc)}: {exc}"
105 )
106 )
107 return False
109 # Chmod + retry.
110 import stat
112 def chmod_rw(p: str) -> None:
113 mode = os.stat(p).st_mode
114 os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR)
116 # For files, we need to recursively go upwards in the directories to
117 # ensure they all are also writable.
118 p = Path(path)
119 if p.is_file():
120 for parent in p.parents:
121 chmod_rw(str(parent))
122 # Stop when we reach the original path passed to rm_rf.
123 if parent == start_path:
124 break
125 chmod_rw(str(path))
127 func(path)
128 return True
131def ensure_extended_length_path(path: Path) -> Path:
132 """Get the extended-length version of a path (Windows).
134 On Windows, by default, the maximum length of a path (MAX_PATH) is 260
135 characters, and operations on paths longer than that fail. But it is possible
136 to overcome this by converting the path to "extended-length" form before
137 performing the operation:
138 https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation
140 On Windows, this function returns the extended-length absolute version of path.
141 On other platforms it returns path unchanged.
142 """
143 if sys.platform.startswith("win32"):
144 path = path.resolve()
145 path = Path(get_extended_length_path_str(str(path)))
146 return path
149def get_extended_length_path_str(path: str) -> str:
150 """Convert a path to a Windows extended length path."""
151 long_path_prefix = "\\\\?\\"
152 unc_long_path_prefix = "\\\\?\\UNC\\"
153 if path.startswith((long_path_prefix, unc_long_path_prefix)):
154 return path
155 # UNC
156 if path.startswith("\\\\"):
157 return unc_long_path_prefix + path[2:]
158 return long_path_prefix + path
161def rm_rf(path: Path) -> None:
162 """Remove the path contents recursively, even if some elements
163 are read-only."""
164 path = ensure_extended_length_path(path)
165 onerror = partial(on_rm_rf_error, start_path=path)
166 if sys.version_info >= (3, 12):
167 shutil.rmtree(str(path), onexc=onerror)
168 else:
169 shutil.rmtree(str(path), onerror=onerror)
172def find_prefixed(root: Path, prefix: str) -> Iterator[os.DirEntry[str]]:
173 """Find all elements in root that begin with the prefix, case-insensitive."""
174 l_prefix = prefix.lower()
175 for x in os.scandir(root):
176 if x.name.lower().startswith(l_prefix):
177 yield x
180def extract_suffixes(iter: Iterable[os.DirEntry[str]], prefix: str) -> Iterator[str]:
181 """Return the parts of the paths following the prefix.
183 :param iter: Iterator over path names.
184 :param prefix: Expected prefix of the path names.
185 """
186 p_len = len(prefix)
187 for entry in iter:
188 yield entry.name[p_len:]
191def find_suffixes(root: Path, prefix: str) -> Iterator[str]:
192 """Combine find_prefixes and extract_suffixes."""
193 return extract_suffixes(find_prefixed(root, prefix), prefix)
196def parse_num(maybe_num: str) -> int:
197 """Parse number path suffixes, returns -1 on error."""
198 try:
199 return int(maybe_num)
200 except ValueError:
201 return -1
204def _force_symlink(root: Path, target: str | PurePath, link_to: str | Path) -> None:
205 """Helper to create the current symlink.
207 It's full of race conditions that are reasonably OK to ignore
208 for the context of best effort linking to the latest test run.
210 The presumption being that in case of much parallelism
211 the inaccuracy is going to be acceptable.
212 """
213 current_symlink = root.joinpath(target)
214 try:
215 current_symlink.unlink()
216 except OSError:
217 pass
218 try:
219 current_symlink.symlink_to(link_to)
220 except Exception:
221 pass
224def make_numbered_dir(root: Path, prefix: str, mode: int = 0o700) -> Path:
225 """Create a directory with an increased number as suffix for the given prefix."""
226 for i in range(10):
227 # try up to 10 times to create the directory
228 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
229 new_number = max_existing + 1
230 new_path = root.joinpath(f"{prefix}{new_number}")
231 try:
232 new_path.mkdir(mode=mode)
233 except Exception:
234 pass
235 else:
236 _force_symlink(root, prefix + "current", new_path)
237 return new_path
238 else:
239 raise OSError(
240 "could not create numbered dir with prefix "
241 f"{prefix} in {root} after 10 tries"
242 )
245def create_cleanup_lock(p: Path) -> Path:
246 """Create a lock to prevent premature directory cleanup."""
247 lock_path = get_lock_path(p)
248 try:
249 fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
250 except FileExistsError as e:
251 raise OSError(f"cannot create lockfile in {p}") from e
252 else:
253 pid = os.getpid()
254 spid = str(pid).encode()
255 os.write(fd, spid)
256 os.close(fd)
257 if not lock_path.is_file():
258 raise OSError("lock path got renamed after successful creation")
259 return lock_path
262def register_cleanup_lock_removal(lock_path: Path, register: Any) -> Any:
263 """Register a cleanup function for removing a lock."""
264 pid = os.getpid()
266 def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None:
267 current_pid = os.getpid()
268 if current_pid != original_pid:
269 # fork
270 return
271 try:
272 lock_path.unlink()
273 except OSError:
274 pass
276 return register(cleanup_on_exit)
279def maybe_delete_a_numbered_dir(path: Path) -> None:
280 """Remove a numbered directory if its lock can be obtained and it does
281 not seem to be in use."""
282 path = ensure_extended_length_path(path)
283 lock_path = None
284 try:
285 lock_path = create_cleanup_lock(path)
286 parent = path.parent
288 garbage = parent.joinpath(f"garbage-{uuid.uuid4()}")
289 path.rename(garbage)
290 rm_rf(garbage)
291 except OSError:
292 # known races:
293 # * other process did a cleanup at the same time
294 # * deletable directory was found
295 # * process cwd (Windows)
296 return
297 finally:
298 # If we created the lock, ensure we remove it even if we failed
299 # to properly remove the numbered dir.
300 if lock_path is not None:
301 try:
302 lock_path.unlink()
303 except OSError:
304 pass
307def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool:
308 """Check if `path` is deletable based on whether the lock file is expired."""
309 if path.is_symlink():
310 return False
311 lock = get_lock_path(path)
312 try:
313 if not lock.is_file():
314 return True
315 except OSError:
316 # we might not have access to the lock file at all, in this case assume
317 # we don't have access to the entire directory (#7491).
318 return False
319 try:
320 lock_time = lock.stat().st_mtime
321 except Exception:
322 return False
323 else:
324 if lock_time < consider_lock_dead_if_created_before:
325 # We want to ignore any errors while trying to remove the lock such as:
326 # - PermissionDenied, like the file permissions have changed since the lock creation;
327 # - FileNotFoundError, in case another pytest process got here first;
328 # and any other cause of failure.
329 with contextlib.suppress(OSError):
330 lock.unlink()
331 return True
332 return False
335def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None:
336 """Try to cleanup a directory if we can ensure it's deletable."""
337 if ensure_deletable(path, consider_lock_dead_if_created_before):
338 maybe_delete_a_numbered_dir(path)
341def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]:
342 """List candidates for numbered directories to be removed - follows py.path."""
343 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
344 max_delete = max_existing - keep
345 entries = find_prefixed(root, prefix)
346 entries, entries2 = itertools.tee(entries)
347 numbers = map(parse_num, extract_suffixes(entries2, prefix))
348 for entry, number in zip(entries, numbers, strict=True):
349 if number <= max_delete:
350 yield Path(entry)
353def cleanup_dead_symlinks(root: Path) -> None:
354 for left_dir in root.iterdir():
355 if left_dir.is_symlink():
356 if not left_dir.resolve().exists():
357 left_dir.unlink()
360def cleanup_numbered_dir(
361 root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float
362) -> None:
363 """Cleanup for lock driven numbered directories."""
364 if not root.exists():
365 return
366 for path in cleanup_candidates(root, prefix, keep):
367 try_cleanup(path, consider_lock_dead_if_created_before)
368 for path in root.glob("garbage-*"):
369 try_cleanup(path, consider_lock_dead_if_created_before)
371 cleanup_dead_symlinks(root)
374def make_numbered_dir_with_cleanup(
375 *,
376 root: Path,
377 prefix: str,
378 mode: int,
379 keep: int,
380 lock_timeout: float,
381 register: Any,
382) -> Path:
383 """Create a numbered dir and register its cleanup.
385 Similar to make_numbered_dir, but also maintains a lock file indicating that
386 the directory is currently in use, and registers the cleanup of the lock and
387 of stale numbered directories.
389 :param keep:
390 The number of sessions to retain the directory.
391 :param lock_timeout:
392 In case of a crash, the lock remains "stuck". The timeout is a time
393 limit after which the lock is considered stale and can be removed.
394 :param register:
395 Called as register(cleanup_func, params...). Should schedule to call
396 passed cleanup functions on session finish.
397 """
398 e = None
399 for i in range(10):
400 try:
401 p = make_numbered_dir(root, prefix, mode)
402 # Only lock the current dir when keep is not 0
403 if keep != 0:
404 lock_path = create_cleanup_lock(p)
405 register_cleanup_lock_removal(lock_path, register)
406 except Exception as exc:
407 e = exc
408 else:
409 consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout
410 # Register a cleanup for program exit
411 register(
412 cleanup_numbered_dir,
413 root,
414 prefix,
415 keep,
416 consider_lock_dead_if_created_before,
417 )
418 return p
419 assert e is not None
420 raise e
423def resolve_from_str(input: str, rootpath: Path) -> Path:
424 input = expanduser(input)
425 input = expandvars(input)
426 if isabs(input):
427 return Path(input)
428 else:
429 return rootpath.joinpath(input)
432def fnmatch_ex(pattern: str, path: str | os.PathLike[str]) -> bool:
433 """A port of FNMatcher from py.path.common which works with PurePath() instances.
435 The difference between this algorithm and PurePath.match() is that the
436 latter matches "**" glob expressions for each part of the path, while
437 this algorithm uses the whole path instead.
439 For example:
440 "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py"
441 with this algorithm, but not with PurePath.match().
443 This algorithm was ported to keep backward-compatibility with existing
444 settings which assume paths match according this logic.
446 References:
447 * https://bugs.python.org/issue29249
448 * https://bugs.python.org/issue34731
449 """
450 path = PurePath(path)
451 iswin32 = sys.platform.startswith("win")
453 if iswin32 and sep not in pattern and posix_sep in pattern:
454 # Running on Windows, the pattern has no Windows path separators,
455 # and the pattern has one or more Posix path separators. Replace
456 # the Posix path separators with the Windows path separator.
457 pattern = pattern.replace(posix_sep, sep)
459 if sep not in pattern:
460 name = path.name
461 else:
462 name = str(path)
463 if path.is_absolute() and not os.path.isabs(pattern):
464 pattern = f"*{os.sep}{pattern}"
465 return fnmatch.fnmatch(name, pattern)
468def parts(s: str) -> set[str]:
469 parts = s.split(sep)
470 return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}
473def symlink_or_skip(
474 src: os.PathLike[str] | str,
475 dst: os.PathLike[str] | str,
476 **kwargs: Any,
477) -> None:
478 """Make a symlink, or skip the test in case symlinks are not supported."""
479 try:
480 os.symlink(src, dst, **kwargs)
481 except OSError as e:
482 skip(f"symlinks not supported: {e}")
485class ImportMode(Enum):
486 """Possible values for `mode` parameter of `import_path`."""
488 prepend = "prepend"
489 append = "append"
490 importlib = "importlib"
493class ImportPathMismatchError(ImportError):
494 """Raised on import_path() if there is a mismatch of __file__'s.
496 This can happen when `import_path` is called multiple times with different filenames that has
497 the same basename but reside in packages
498 (for example "/tests1/test_foo.py" and "/tests2/test_foo.py").
499 """
502def import_path(
503 path: str | os.PathLike[str],
504 *,
505 mode: str | ImportMode = ImportMode.prepend,
506 root: Path,
507 consider_namespace_packages: bool,
508) -> ModuleType:
509 """
510 Import and return a module from the given path, which can be a file (a module) or
511 a directory (a package).
513 :param path:
514 Path to the file to import.
516 :param mode:
517 Controls the underlying import mechanism that will be used:
519 * ImportMode.prepend: the directory containing the module (or package, taking
520 `__init__.py` files into account) will be put at the *start* of `sys.path` before
521 being imported with `importlib.import_module`.
523 * ImportMode.append: same as `prepend`, but the directory will be appended
524 to the end of `sys.path`, if not already in `sys.path`.
526 * ImportMode.importlib: uses more fine control mechanisms provided by `importlib`
527 to import the module, which avoids having to muck with `sys.path` at all. It effectively
528 allows having same-named test modules in different places.
530 :param root:
531 Used as an anchor when mode == ImportMode.importlib to obtain
532 a unique name for the module being imported so it can safely be stored
533 into ``sys.modules``.
535 :param consider_namespace_packages:
536 If True, consider namespace packages when resolving module names.
538 :raises ImportPathMismatchError:
539 If after importing the given `path` and the module `__file__`
540 are different. Only raised in `prepend` and `append` modes.
541 """
542 path = Path(path)
543 mode = ImportMode(mode)
545 if not path.exists():
546 raise ImportError(path)
548 if mode is ImportMode.importlib:
549 # Try to import this module using the standard import mechanisms, but
550 # without touching sys.path.
551 try:
552 _, module_name = resolve_pkg_root_and_module_name(
553 path, consider_namespace_packages=consider_namespace_packages
554 )
555 except CouldNotResolvePathError:
556 pass
557 else:
558 # If the given module name is already in sys.modules, do not import it again.
559 with contextlib.suppress(KeyError):
560 return sys.modules[module_name]
562 mod = _import_module_using_spec(module_name, path, insert_modules=False)
563 if mod is not None:
564 return mod
566 # Could not import the module with the current sys.path, so we fall back
567 # to importing the file as a single module, not being a part of a package.
568 module_name = module_name_from_path(path, root)
569 with contextlib.suppress(KeyError):
570 return sys.modules[module_name]
572 mod = _import_module_using_spec(module_name, path, insert_modules=True)
573 if mod is None:
574 raise ImportError(f"Can't find module {module_name} at location {path}")
575 return mod
577 try:
578 pkg_root, module_name = resolve_pkg_root_and_module_name(
579 path, consider_namespace_packages=consider_namespace_packages
580 )
581 except CouldNotResolvePathError:
582 pkg_root, module_name = path.parent, path.stem
584 # Change sys.path permanently: restoring it at the end of this function would cause surprising
585 # problems because of delayed imports: for example, a conftest.py file imported by this function
586 # might have local imports, which would fail at runtime if we restored sys.path.
587 if mode is ImportMode.append:
588 if str(pkg_root) not in sys.path:
589 sys.path.append(str(pkg_root))
590 elif mode is ImportMode.prepend:
591 if str(pkg_root) != sys.path[0]:
592 sys.path.insert(0, str(pkg_root))
593 else:
594 assert_never(mode)
596 importlib.import_module(module_name)
598 mod = sys.modules[module_name]
599 if path.name == "__init__.py":
600 return mod
602 ignore = os.environ.get("PY_IGNORE_IMPORTMISMATCH", "")
603 if ignore != "1":
604 module_file = mod.__file__
605 if module_file is None:
606 raise ImportPathMismatchError(module_name, module_file, path)
608 if module_file.endswith((".pyc", ".pyo")):
609 module_file = module_file[:-1]
610 if module_file.endswith(os.sep + "__init__.py"):
611 module_file = module_file[: -(len(os.sep + "__init__.py"))]
613 try:
614 is_same = _is_same(str(path), module_file)
615 except FileNotFoundError:
616 is_same = False
618 if not is_same:
619 raise ImportPathMismatchError(module_name, module_file, path)
621 return mod
624def _import_module_using_spec(
625 module_name: str, module_path: Path, *, insert_modules: bool
626) -> ModuleType | None:
627 """
628 Tries to import a module by its canonical name, path, and its parent location.
630 :param module_name:
631 The expected module name, will become the key of `sys.modules`.
633 :param module_path:
634 The file path of the module, for example `/foo/bar/test_demo.py`.
635 If module is a package, pass the path to the `__init__.py` of the package.
636 If module is a namespace package, pass directory path.
638 :param insert_modules:
639 If True, will call `insert_missing_modules` to create empty intermediate modules
640 with made-up module names (when importing test files not reachable from `sys.path`).
642 Example 1 of parent_module_*:
644 module_name: "a.b.c.demo"
645 module_path: Path("a/b/c/demo.py")
646 if "a.b.c" is package ("a/b/c/__init__.py" exists), then
647 parent_module_name: "a.b.c"
648 parent_module_path: Path("a/b/c/__init__.py")
649 else:
650 parent_module_name: "a.b.c"
651 parent_module_path: Path("a/b/c")
653 Example 2 of parent_module_*:
655 module_name: "a.b.c"
656 module_path: Path("a/b/c/__init__.py")
657 if "a.b" is package ("a/b/__init__.py" exists), then
658 parent_module_name: "a.b"
659 parent_module_path: Path("a/b/__init__.py")
660 else:
661 parent_module_name: "a.b"
662 parent_module_path: Path("a/b/")
663 """
664 # Attempt to import the parent module, seems is our responsibility:
665 # https://github.com/python/cpython/blob/73906d5c908c1e0b73c5436faeff7d93698fc074/Lib/importlib/_bootstrap.py#L1308-L1311
666 parent_module_name, _, name = module_name.rpartition(".")
667 parent_module: ModuleType | None = None
668 if parent_module_name:
669 parent_module = sys.modules.get(parent_module_name)
670 # If the parent_module lacks the `__path__` attribute, AttributeError when finding a submodule's spec,
671 # requiring re-import according to the path.
672 need_reimport = not hasattr(parent_module, "__path__")
673 if parent_module is None or need_reimport:
674 # Get parent_location based on location, get parent_path based on path.
675 if module_path.name == "__init__.py":
676 # If the current module is in a package,
677 # need to leave the package first and then enter the parent module.
678 parent_module_path = module_path.parent.parent
679 else:
680 parent_module_path = module_path.parent
682 if (parent_module_path / "__init__.py").is_file():
683 # If the parent module is a package, loading by __init__.py file.
684 parent_module_path = parent_module_path / "__init__.py"
686 parent_module = _import_module_using_spec(
687 parent_module_name,
688 parent_module_path,
689 insert_modules=insert_modules,
690 )
692 # Checking with sys.meta_path first in case one of its hooks can import this module,
693 # such as our own assertion-rewrite hook.
694 find_spec_path = [str(module_path.parent)]
695 for meta_importer in sys.meta_path:
696 spec = meta_importer.find_spec(module_name, find_spec_path)
698 if spec_matches_module_path(spec, module_path):
699 break
700 else:
701 loader = None
702 if module_path.is_dir():
703 # The `spec_from_file_location` matches a loader based on the file extension by default.
704 # For a namespace package, need to manually specify a loader.
705 loader = NamespaceLoader(name, module_path, PathFinder()) # type: ignore[arg-type]
707 spec = importlib.util.spec_from_file_location(
708 module_name, str(module_path), loader=loader
709 )
711 if spec_matches_module_path(spec, module_path):
712 assert spec is not None
713 # Find spec and import this module.
714 mod = importlib.util.module_from_spec(spec)
715 sys.modules[module_name] = mod
716 spec.loader.exec_module(mod) # type: ignore[union-attr]
718 # Set this module as an attribute of the parent module (#12194).
719 if parent_module is not None:
720 setattr(parent_module, name, mod)
722 if insert_modules:
723 insert_missing_modules(sys.modules, module_name)
724 return mod
726 return None
729def spec_matches_module_path(module_spec: ModuleSpec | None, module_path: Path) -> bool:
730 """Return true if the given ModuleSpec can be used to import the given module path."""
731 if module_spec is None:
732 return False
734 if module_spec.origin:
735 return Path(module_spec.origin) == module_path
737 # Compare the path with the `module_spec.submodule_Search_Locations` in case
738 # the module is part of a namespace package.
739 # https://docs.python.org/3/library/importlib.html#importlib.machinery.ModuleSpec.submodule_search_locations
740 if module_spec.submodule_search_locations: # can be None.
741 for path in module_spec.submodule_search_locations:
742 if Path(path) == module_path:
743 return True
745 return False
748# Implement a special _is_same function on Windows which returns True if the two filenames
749# compare equal, to circumvent os.path.samefile returning False for mounts in UNC (#7678).
750if sys.platform.startswith("win"):
752 def _is_same(f1: str, f2: str) -> bool:
753 return Path(f1) == Path(f2) or os.path.samefile(f1, f2)
755else:
757 def _is_same(f1: str, f2: str) -> bool:
758 return os.path.samefile(f1, f2)
761def module_name_from_path(path: Path, root: Path) -> str:
762 """
763 Return a dotted module name based on the given path, anchored on root.
765 For example: path="projects/src/tests/test_foo.py" and root="/projects", the
766 resulting module name will be "src.tests.test_foo".
767 """
768 path = path.with_suffix("")
769 try:
770 relative_path = path.relative_to(root)
771 except ValueError:
772 # If we can't get a relative path to root, use the full path, except
773 # for the first part ("d:\\" or "/" depending on the platform, for example).
774 path_parts = path.parts[1:]
775 else:
776 # Use the parts for the relative path to the root path.
777 path_parts = relative_path.parts
779 # Module name for packages do not contain the __init__ file, unless
780 # the `__init__.py` file is at the root.
781 if len(path_parts) >= 2 and path_parts[-1] == "__init__":
782 path_parts = path_parts[:-1]
784 # Module names cannot contain ".", normalize them to "_". This prevents
785 # a directory having a "." in the name (".env.310" for example) causing extra intermediate modules.
786 # Also, important to replace "." at the start of paths, as those are considered relative imports.
787 path_parts = tuple(x.replace(".", "_") for x in path_parts)
789 return ".".join(path_parts)
792def insert_missing_modules(modules: dict[str, ModuleType], module_name: str) -> None:
793 """
794 Used by ``import_path`` to create intermediate modules when using mode=importlib.
796 When we want to import a module as "src.tests.test_foo" for example, we need
797 to create empty modules "src" and "src.tests" after inserting "src.tests.test_foo",
798 otherwise "src.tests.test_foo" is not importable by ``__import__``.
799 """
800 module_parts = module_name.split(".")
801 while module_name:
802 parent_module_name, _, child_name = module_name.rpartition(".")
803 if parent_module_name:
804 parent_module = modules.get(parent_module_name)
805 if parent_module is None:
806 try:
807 # If sys.meta_path is empty, calling import_module will issue
808 # a warning and raise ModuleNotFoundError. To avoid the
809 # warning, we check sys.meta_path explicitly and raise the error
810 # ourselves to fall back to creating a dummy module.
811 if not sys.meta_path:
812 raise ModuleNotFoundError
813 parent_module = importlib.import_module(parent_module_name)
814 except ModuleNotFoundError:
815 parent_module = ModuleType(
816 module_name,
817 doc="Empty module created by pytest's importmode=importlib.",
818 )
819 modules[parent_module_name] = parent_module
821 # Add child attribute to the parent that can reference the child
822 # modules.
823 if not hasattr(parent_module, child_name):
824 setattr(parent_module, child_name, modules[module_name])
826 module_parts.pop(-1)
827 module_name = ".".join(module_parts)
830def resolve_package_path(path: Path) -> Path | None:
831 """Return the Python package path by looking for the last
832 directory upwards which still contains an __init__.py.
834 Returns None if it cannot be determined.
835 """
836 result = None
837 for parent in itertools.chain((path,), path.parents):
838 if parent.is_dir():
839 if not (parent / "__init__.py").is_file():
840 break
841 if not parent.name.isidentifier():
842 break
843 result = parent
844 return result
847def resolve_pkg_root_and_module_name(
848 path: Path, *, consider_namespace_packages: bool = False
849) -> tuple[Path, str]:
850 """
851 Return the path to the directory of the root package that contains the
852 given Python file, and its module name:
854 src/
855 app/
856 __init__.py
857 core/
858 __init__.py
859 models.py
861 Passing the full path to `models.py` will yield Path("src") and "app.core.models".
863 If consider_namespace_packages is True, then we additionally check upwards in the hierarchy
864 for namespace packages:
866 https://packaging.python.org/en/latest/guides/packaging-namespace-packages
868 Raises CouldNotResolvePathError if the given path does not belong to a package (missing any __init__.py files).
869 """
870 pkg_root: Path | None = None
871 pkg_path = resolve_package_path(path)
872 if pkg_path is not None:
873 pkg_root = pkg_path.parent
874 if consider_namespace_packages:
875 start = pkg_root if pkg_root is not None else path.parent
876 for candidate in (start, *start.parents):
877 module_name = compute_module_name(candidate, path)
878 if module_name and is_importable(module_name, path):
879 # Point the pkg_root to the root of the namespace package.
880 pkg_root = candidate
881 break
883 if pkg_root is not None:
884 module_name = compute_module_name(pkg_root, path)
885 if module_name:
886 return pkg_root, module_name
888 raise CouldNotResolvePathError(f"Could not resolve for {path}")
891def is_importable(module_name: str, module_path: Path) -> bool:
892 """
893 Return if the given module path could be imported normally by Python, akin to the user
894 entering the REPL and importing the corresponding module name directly, and corresponds
895 to the module_path specified.
897 :param module_name:
898 Full module name that we want to check if is importable.
899 For example, "app.models".
901 :param module_path:
902 Full path to the python module/package we want to check if is importable.
903 For example, "/projects/src/app/models.py".
904 """
905 try:
906 # Note this is different from what we do in ``_import_module_using_spec``, where we explicitly search through
907 # sys.meta_path to be able to pass the path of the module that we want to import (``meta_importer.find_spec``).
908 # Using importlib.util.find_spec() is different, it gives the same results as trying to import
909 # the module normally in the REPL.
910 spec = importlib.util.find_spec(module_name)
911 except (ImportError, ValueError, ImportWarning):
912 return False
913 else:
914 return spec_matches_module_path(spec, module_path)
917def compute_module_name(root: Path, module_path: Path) -> str | None:
918 """Compute a module name based on a path and a root anchor."""
919 try:
920 path_without_suffix = module_path.with_suffix("")
921 except ValueError:
922 # Empty paths (such as Path.cwd()) might break meta_path hooks (like our own assertion rewriter).
923 return None
925 try:
926 relative = path_without_suffix.relative_to(root)
927 except ValueError: # pragma: no cover
928 return None
929 names = list(relative.parts)
930 if not names:
931 return None
932 if names[-1] == "__init__":
933 names.pop()
934 return ".".join(names)
937class CouldNotResolvePathError(Exception):
938 """Custom exception raised by resolve_pkg_root_and_module_name."""
941def scandir(
942 path: str | os.PathLike[str],
943 sort_key: Callable[[os.DirEntry[str]], object] = lambda entry: entry.name,
944) -> list[os.DirEntry[str]]:
945 """Scan a directory recursively, in breadth-first order.
947 The returned entries are sorted according to the given key.
948 The default is to sort by name.
949 If the directory does not exist, return an empty list.
950 """
951 entries = []
952 # Attempt to create a scandir iterator for the given path.
953 try:
954 scandir_iter = os.scandir(path)
955 except FileNotFoundError:
956 # If the directory does not exist, return an empty list.
957 return []
958 # Use the scandir iterator in a context manager to ensure it is properly closed.
959 with scandir_iter as s:
960 for entry in s:
961 try:
962 entry.is_file()
963 except OSError as err:
964 if _ignore_error(err):
965 continue
966 # Reraise non-ignorable errors to avoid hiding issues.
967 raise
968 entries.append(entry)
969 entries.sort(key=sort_key) # type: ignore[arg-type]
970 return entries
973def visit(
974 path: str | os.PathLike[str], recurse: Callable[[os.DirEntry[str]], bool]
975) -> Iterator[os.DirEntry[str]]:
976 """Walk a directory recursively, in breadth-first order.
978 The `recurse` predicate determines whether a directory is recursed.
980 Entries at each directory level are sorted.
981 """
982 entries = scandir(path)
983 yield from entries
984 for entry in entries:
985 if entry.is_dir() and recurse(entry):
986 yield from visit(entry.path, recurse)
989def absolutepath(path: str | os.PathLike[str]) -> Path:
990 """Convert a path to an absolute path using os.path.abspath.
992 Prefer this over Path.resolve() (see #6523).
993 Prefer this over Path.absolute() (not public, doesn't normalize).
994 """
995 return Path(os.path.abspath(path))
998def commonpath(path1: Path, path2: Path) -> Path | None:
999 """Return the common part shared with the other path, or None if there is
1000 no common part.
1002 If one path is relative and one is absolute, returns None.
1003 """
1004 try:
1005 return Path(os.path.commonpath((str(path1), str(path2))))
1006 except ValueError:
1007 return None
1010def bestrelpath(directory: Path, dest: Path) -> str:
1011 """Return a string which is a relative path from directory to dest such
1012 that directory/bestrelpath == dest.
1014 The paths must be either both absolute or both relative.
1016 If no such path can be determined, returns dest.
1017 """
1018 assert isinstance(directory, Path)
1019 assert isinstance(dest, Path)
1020 if dest == directory:
1021 return os.curdir
1022 # Find the longest common directory.
1023 base = commonpath(directory, dest)
1024 # Can be the case on Windows for two absolute paths on different drives.
1025 # Can be the case for two relative paths without common prefix.
1026 # Can be the case for a relative path and an absolute path.
1027 if not base:
1028 return str(dest)
1029 reldirectory = directory.relative_to(base)
1030 reldest = dest.relative_to(base)
1031 return os.path.join(
1032 # Back from directory to base.
1033 *([os.pardir] * len(reldirectory.parts)),
1034 # Forward from base to dest.
1035 *reldest.parts,
1036 )
1039def safe_exists(p: Path) -> bool:
1040 """Like Path.exists(), but account for input arguments that might be too long (#11394)."""
1041 try:
1042 return p.exists()
1043 except (ValueError, OSError):
1044 # ValueError: stat: path too long for Windows
1045 # OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect
1046 return False
1049def samefile_nofollow(p1: Path, p2: Path) -> bool:
1050 """Test whether two paths reference the same actual file or directory.
1052 Unlike Path.samefile(), does not resolve symlinks.
1053 """
1054 return os.path.samestat(p1.lstat(), p2.lstat())