Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/_pytest/pathlib.py: 22%
394 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:23 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:23 +0000
1import atexit
2import contextlib
3import fnmatch
4import importlib.util
5import itertools
6import os
7import shutil
8import sys
9import types
10import uuid
11import warnings
12from enum import Enum
13from errno import EBADF
14from errno import ELOOP
15from errno import ENOENT
16from errno import ENOTDIR
17from functools import partial
18from os.path import expanduser
19from os.path import expandvars
20from os.path import isabs
21from os.path import sep
22from pathlib import Path
23from pathlib import PurePath
24from posixpath import sep as posix_sep
25from types import ModuleType
26from typing import Callable
27from typing import Dict
28from typing import Iterable
29from typing import Iterator
30from typing import Optional
31from typing import Set
32from typing import Tuple
33from typing import Type
34from typing import TypeVar
35from typing import Union
37from _pytest.compat import assert_never
38from _pytest.outcomes import skip
39from _pytest.warning_types import PytestWarning
41LOCK_TIMEOUT = 60 * 60 * 24 * 3
44_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath)
46# The following function, variables and comments were
47# copied from cpython 3.9 Lib/pathlib.py file.
49# EBADF - guard against macOS `stat` throwing EBADF
50_IGNORED_ERRORS = (ENOENT, ENOTDIR, EBADF, ELOOP)
52_IGNORED_WINERRORS = (
53 21, # ERROR_NOT_READY - drive exists but is not accessible
54 1921, # ERROR_CANT_RESOLVE_FILENAME - fix for broken symlink pointing to itself
55)
58def _ignore_error(exception):
59 return (
60 getattr(exception, "errno", None) in _IGNORED_ERRORS
61 or getattr(exception, "winerror", None) in _IGNORED_WINERRORS
62 )
65def get_lock_path(path: _AnyPurePath) -> _AnyPurePath:
66 return path.joinpath(".lock")
69def on_rm_rf_error(
70 func,
71 path: str,
72 excinfo: Union[
73 BaseException,
74 Tuple[Type[BaseException], BaseException, Optional[types.TracebackType]],
75 ],
76 *,
77 start_path: Path,
78) -> bool:
79 """Handle known read-only errors during rmtree.
81 The returned value is used only by our own tests.
82 """
83 if isinstance(excinfo, BaseException):
84 exc = excinfo
85 else:
86 exc = excinfo[1]
88 # Another process removed the file in the middle of the "rm_rf" (xdist for example).
89 # More context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018
90 if isinstance(exc, FileNotFoundError):
91 return False
93 if not isinstance(exc, PermissionError):
94 warnings.warn(
95 PytestWarning(f"(rm_rf) error removing {path}\n{type(exc)}: {exc}")
96 )
97 return False
99 if func not in (os.rmdir, os.remove, os.unlink):
100 if func not in (os.open,):
101 warnings.warn(
102 PytestWarning(
103 "(rm_rf) unknown function {} when removing {}:\n{}: {}".format(
104 func, path, type(exc), exc
105 )
106 )
107 )
108 return False
110 # Chmod + retry.
111 import stat
113 def chmod_rw(p: str) -> None:
114 mode = os.stat(p).st_mode
115 os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR)
117 # For files, we need to recursively go upwards in the directories to
118 # ensure they all are also writable.
119 p = Path(path)
120 if p.is_file():
121 for parent in p.parents:
122 chmod_rw(str(parent))
123 # Stop when we reach the original path passed to rm_rf.
124 if parent == start_path:
125 break
126 chmod_rw(str(path))
128 func(path)
129 return True
132def ensure_extended_length_path(path: Path) -> Path:
133 """Get the extended-length version of a path (Windows).
135 On Windows, by default, the maximum length of a path (MAX_PATH) is 260
136 characters, and operations on paths longer than that fail. But it is possible
137 to overcome this by converting the path to "extended-length" form before
138 performing the operation:
139 https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation
141 On Windows, this function returns the extended-length absolute version of path.
142 On other platforms it returns path unchanged.
143 """
144 if sys.platform.startswith("win32"):
145 path = path.resolve()
146 path = Path(get_extended_length_path_str(str(path)))
147 return path
150def get_extended_length_path_str(path: str) -> str:
151 """Convert a path to a Windows extended length path."""
152 long_path_prefix = "\\\\?\\"
153 unc_long_path_prefix = "\\\\?\\UNC\\"
154 if path.startswith((long_path_prefix, unc_long_path_prefix)):
155 return path
156 # UNC
157 if path.startswith("\\\\"):
158 return unc_long_path_prefix + path[2:]
159 return long_path_prefix + path
162def rm_rf(path: Path) -> None:
163 """Remove the path contents recursively, even if some elements
164 are read-only."""
165 path = ensure_extended_length_path(path)
166 onerror = partial(on_rm_rf_error, start_path=path)
167 if sys.version_info >= (3, 12):
168 shutil.rmtree(str(path), onexc=onerror)
169 else:
170 shutil.rmtree(str(path), onerror=onerror)
173def find_prefixed(root: Path, prefix: str) -> Iterator[Path]:
174 """Find all elements in root that begin with the prefix, case insensitive."""
175 l_prefix = prefix.lower()
176 for x in root.iterdir():
177 if x.name.lower().startswith(l_prefix):
178 yield x
181def extract_suffixes(iter: Iterable[PurePath], prefix: str) -> Iterator[str]:
182 """Return the parts of the paths following the prefix.
184 :param iter: Iterator over path names.
185 :param prefix: Expected prefix of the path names.
186 """
187 p_len = len(prefix)
188 for p in iter:
189 yield p.name[p_len:]
192def find_suffixes(root: Path, prefix: str) -> Iterator[str]:
193 """Combine find_prefixes and extract_suffixes."""
194 return extract_suffixes(find_prefixed(root, prefix), prefix)
197def parse_num(maybe_num) -> int:
198 """Parse number path suffixes, returns -1 on error."""
199 try:
200 return int(maybe_num)
201 except ValueError:
202 return -1
205def _force_symlink(
206 root: Path, target: Union[str, PurePath], link_to: Union[str, Path]
207) -> None:
208 """Helper to create the current symlink.
210 It's full of race conditions that are reasonably OK to ignore
211 for the context of best effort linking to the latest test run.
213 The presumption being that in case of much parallelism
214 the inaccuracy is going to be acceptable.
215 """
216 current_symlink = root.joinpath(target)
217 try:
218 current_symlink.unlink()
219 except OSError:
220 pass
221 try:
222 current_symlink.symlink_to(link_to)
223 except Exception:
224 pass
227def make_numbered_dir(root: Path, prefix: str, mode: int = 0o700) -> Path:
228 """Create a directory with an increased number as suffix for the given prefix."""
229 for i in range(10):
230 # try up to 10 times to create the folder
231 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
232 new_number = max_existing + 1
233 new_path = root.joinpath(f"{prefix}{new_number}")
234 try:
235 new_path.mkdir(mode=mode)
236 except Exception:
237 pass
238 else:
239 _force_symlink(root, prefix + "current", new_path)
240 return new_path
241 else:
242 raise OSError(
243 "could not create numbered dir with prefix "
244 "{prefix} in {root} after 10 tries".format(prefix=prefix, root=root)
245 )
248def create_cleanup_lock(p: Path) -> Path:
249 """Create a lock to prevent premature folder cleanup."""
250 lock_path = get_lock_path(p)
251 try:
252 fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
253 except FileExistsError as e:
254 raise OSError(f"cannot create lockfile in {p}") from e
255 else:
256 pid = os.getpid()
257 spid = str(pid).encode()
258 os.write(fd, spid)
259 os.close(fd)
260 if not lock_path.is_file():
261 raise OSError("lock path got renamed after successful creation")
262 return lock_path
265def register_cleanup_lock_removal(lock_path: Path, register=atexit.register):
266 """Register a cleanup function for removing a lock, by default on atexit."""
267 pid = os.getpid()
269 def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None:
270 current_pid = os.getpid()
271 if current_pid != original_pid:
272 # fork
273 return
274 try:
275 lock_path.unlink()
276 except OSError:
277 pass
279 return register(cleanup_on_exit)
282def maybe_delete_a_numbered_dir(path: Path) -> None:
283 """Remove a numbered directory if its lock can be obtained and it does
284 not seem to be in use."""
285 path = ensure_extended_length_path(path)
286 lock_path = None
287 try:
288 lock_path = create_cleanup_lock(path)
289 parent = path.parent
291 garbage = parent.joinpath(f"garbage-{uuid.uuid4()}")
292 path.rename(garbage)
293 rm_rf(garbage)
294 except OSError:
295 # known races:
296 # * other process did a cleanup at the same time
297 # * deletable folder was found
298 # * process cwd (Windows)
299 return
300 finally:
301 # If we created the lock, ensure we remove it even if we failed
302 # to properly remove the numbered dir.
303 if lock_path is not None:
304 try:
305 lock_path.unlink()
306 except OSError:
307 pass
310def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool:
311 """Check if `path` is deletable based on whether the lock file is expired."""
312 if path.is_symlink():
313 return False
314 lock = get_lock_path(path)
315 try:
316 if not lock.is_file():
317 return True
318 except OSError:
319 # we might not have access to the lock file at all, in this case assume
320 # we don't have access to the entire directory (#7491).
321 return False
322 try:
323 lock_time = lock.stat().st_mtime
324 except Exception:
325 return False
326 else:
327 if lock_time < consider_lock_dead_if_created_before:
328 # We want to ignore any errors while trying to remove the lock such as:
329 # - PermissionDenied, like the file permissions have changed since the lock creation;
330 # - FileNotFoundError, in case another pytest process got here first;
331 # and any other cause of failure.
332 with contextlib.suppress(OSError):
333 lock.unlink()
334 return True
335 return False
338def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None:
339 """Try to cleanup a folder if we can ensure it's deletable."""
340 if ensure_deletable(path, consider_lock_dead_if_created_before):
341 maybe_delete_a_numbered_dir(path)
344def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]:
345 """List candidates for numbered directories to be removed - follows py.path."""
346 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
347 max_delete = max_existing - keep
348 paths = find_prefixed(root, prefix)
349 paths, paths2 = itertools.tee(paths)
350 numbers = map(parse_num, extract_suffixes(paths2, prefix))
351 for path, number in zip(paths, numbers):
352 if number <= max_delete:
353 yield path
356def cleanup_dead_symlinks(root: Path):
357 for left_dir in root.iterdir():
358 if left_dir.is_symlink():
359 if not left_dir.resolve().exists():
360 left_dir.unlink()
363def cleanup_numbered_dir(
364 root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float
365) -> None:
366 """Cleanup for lock driven numbered directories."""
367 if not root.exists():
368 return
369 for path in cleanup_candidates(root, prefix, keep):
370 try_cleanup(path, consider_lock_dead_if_created_before)
371 for path in root.glob("garbage-*"):
372 try_cleanup(path, consider_lock_dead_if_created_before)
374 cleanup_dead_symlinks(root)
377def make_numbered_dir_with_cleanup(
378 root: Path,
379 prefix: str,
380 keep: int,
381 lock_timeout: float,
382 mode: int,
383) -> Path:
384 """Create a numbered dir with a cleanup lock and remove old ones."""
385 e = None
386 for i in range(10):
387 try:
388 p = make_numbered_dir(root, prefix, mode)
389 # Only lock the current dir when keep is not 0
390 if keep != 0:
391 lock_path = create_cleanup_lock(p)
392 register_cleanup_lock_removal(lock_path)
393 except Exception as exc:
394 e = exc
395 else:
396 consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout
397 # Register a cleanup for program exit
398 atexit.register(
399 cleanup_numbered_dir,
400 root,
401 prefix,
402 keep,
403 consider_lock_dead_if_created_before,
404 )
405 return p
406 assert e is not None
407 raise e
410def resolve_from_str(input: str, rootpath: Path) -> Path:
411 input = expanduser(input)
412 input = expandvars(input)
413 if isabs(input):
414 return Path(input)
415 else:
416 return rootpath.joinpath(input)
419def fnmatch_ex(pattern: str, path: Union[str, "os.PathLike[str]"]) -> bool:
420 """A port of FNMatcher from py.path.common which works with PurePath() instances.
422 The difference between this algorithm and PurePath.match() is that the
423 latter matches "**" glob expressions for each part of the path, while
424 this algorithm uses the whole path instead.
426 For example:
427 "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py"
428 with this algorithm, but not with PurePath.match().
430 This algorithm was ported to keep backward-compatibility with existing
431 settings which assume paths match according this logic.
433 References:
434 * https://bugs.python.org/issue29249
435 * https://bugs.python.org/issue34731
436 """
437 path = PurePath(path)
438 iswin32 = sys.platform.startswith("win")
440 if iswin32 and sep not in pattern and posix_sep in pattern:
441 # Running on Windows, the pattern has no Windows path separators,
442 # and the pattern has one or more Posix path separators. Replace
443 # the Posix path separators with the Windows path separator.
444 pattern = pattern.replace(posix_sep, sep)
446 if sep not in pattern:
447 name = path.name
448 else:
449 name = str(path)
450 if path.is_absolute() and not os.path.isabs(pattern):
451 pattern = f"*{os.sep}{pattern}"
452 return fnmatch.fnmatch(name, pattern)
455def parts(s: str) -> Set[str]:
456 parts = s.split(sep)
457 return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}
460def symlink_or_skip(src, dst, **kwargs):
461 """Make a symlink, or skip the test in case symlinks are not supported."""
462 try:
463 os.symlink(str(src), str(dst), **kwargs)
464 except OSError as e:
465 skip(f"symlinks not supported: {e}")
468class ImportMode(Enum):
469 """Possible values for `mode` parameter of `import_path`."""
471 prepend = "prepend"
472 append = "append"
473 importlib = "importlib"
476class ImportPathMismatchError(ImportError):
477 """Raised on import_path() if there is a mismatch of __file__'s.
479 This can happen when `import_path` is called multiple times with different filenames that has
480 the same basename but reside in packages
481 (for example "/tests1/test_foo.py" and "/tests2/test_foo.py").
482 """
485def import_path(
486 p: Union[str, "os.PathLike[str]"],
487 *,
488 mode: Union[str, ImportMode] = ImportMode.prepend,
489 root: Path,
490) -> ModuleType:
491 """Import and return a module from the given path, which can be a file (a module) or
492 a directory (a package).
494 The import mechanism used is controlled by the `mode` parameter:
496 * `mode == ImportMode.prepend`: the directory containing the module (or package, taking
497 `__init__.py` files into account) will be put at the *start* of `sys.path` before
498 being imported with `importlib.import_module`.
500 * `mode == ImportMode.append`: same as `prepend`, but the directory will be appended
501 to the end of `sys.path`, if not already in `sys.path`.
503 * `mode == ImportMode.importlib`: uses more fine control mechanisms provided by `importlib`
504 to import the module, which avoids having to muck with `sys.path` at all. It effectively
505 allows having same-named test modules in different places.
507 :param root:
508 Used as an anchor when mode == ImportMode.importlib to obtain
509 a unique name for the module being imported so it can safely be stored
510 into ``sys.modules``.
512 :raises ImportPathMismatchError:
513 If after importing the given `path` and the module `__file__`
514 are different. Only raised in `prepend` and `append` modes.
515 """
516 mode = ImportMode(mode)
518 path = Path(p)
520 if not path.exists():
521 raise ImportError(path)
523 if mode is ImportMode.importlib:
524 module_name = module_name_from_path(path, root)
526 for meta_importer in sys.meta_path:
527 spec = meta_importer.find_spec(module_name, [str(path.parent)])
528 if spec is not None:
529 break
530 else:
531 spec = importlib.util.spec_from_file_location(module_name, str(path))
533 if spec is None:
534 raise ImportError(f"Can't find module {module_name} at location {path}")
535 mod = importlib.util.module_from_spec(spec)
536 sys.modules[module_name] = mod
537 spec.loader.exec_module(mod) # type: ignore[union-attr]
538 insert_missing_modules(sys.modules, module_name)
539 return mod
541 pkg_path = resolve_package_path(path)
542 if pkg_path is not None:
543 pkg_root = pkg_path.parent
544 names = list(path.with_suffix("").relative_to(pkg_root).parts)
545 if names[-1] == "__init__":
546 names.pop()
547 module_name = ".".join(names)
548 else:
549 pkg_root = path.parent
550 module_name = path.stem
552 # Change sys.path permanently: restoring it at the end of this function would cause surprising
553 # problems because of delayed imports: for example, a conftest.py file imported by this function
554 # might have local imports, which would fail at runtime if we restored sys.path.
555 if mode is ImportMode.append:
556 if str(pkg_root) not in sys.path:
557 sys.path.append(str(pkg_root))
558 elif mode is ImportMode.prepend:
559 if str(pkg_root) != sys.path[0]:
560 sys.path.insert(0, str(pkg_root))
561 else:
562 assert_never(mode)
564 importlib.import_module(module_name)
566 mod = sys.modules[module_name]
567 if path.name == "__init__.py":
568 return mod
570 ignore = os.environ.get("PY_IGNORE_IMPORTMISMATCH", "")
571 if ignore != "1":
572 module_file = mod.__file__
573 if module_file is None:
574 raise ImportPathMismatchError(module_name, module_file, path)
576 if module_file.endswith((".pyc", ".pyo")):
577 module_file = module_file[:-1]
578 if module_file.endswith(os.sep + "__init__.py"):
579 module_file = module_file[: -(len(os.sep + "__init__.py"))]
581 try:
582 is_same = _is_same(str(path), module_file)
583 except FileNotFoundError:
584 is_same = False
586 if not is_same:
587 raise ImportPathMismatchError(module_name, module_file, path)
589 return mod
592# Implement a special _is_same function on Windows which returns True if the two filenames
593# compare equal, to circumvent os.path.samefile returning False for mounts in UNC (#7678).
594if sys.platform.startswith("win"):
596 def _is_same(f1: str, f2: str) -> bool:
597 return Path(f1) == Path(f2) or os.path.samefile(f1, f2)
599else:
601 def _is_same(f1: str, f2: str) -> bool:
602 return os.path.samefile(f1, f2)
605def module_name_from_path(path: Path, root: Path) -> str:
606 """
607 Return a dotted module name based on the given path, anchored on root.
609 For example: path="projects/src/tests/test_foo.py" and root="/projects", the
610 resulting module name will be "src.tests.test_foo".
611 """
612 path = path.with_suffix("")
613 try:
614 relative_path = path.relative_to(root)
615 except ValueError:
616 # If we can't get a relative path to root, use the full path, except
617 # for the first part ("d:\\" or "/" depending on the platform, for example).
618 path_parts = path.parts[1:]
619 else:
620 # Use the parts for the relative path to the root path.
621 path_parts = relative_path.parts
623 return ".".join(path_parts)
626def insert_missing_modules(modules: Dict[str, ModuleType], module_name: str) -> None:
627 """
628 Used by ``import_path`` to create intermediate modules when using mode=importlib.
630 When we want to import a module as "src.tests.test_foo" for example, we need
631 to create empty modules "src" and "src.tests" after inserting "src.tests.test_foo",
632 otherwise "src.tests.test_foo" is not importable by ``__import__``.
633 """
634 module_parts = module_name.split(".")
635 while module_name:
636 if module_name not in modules:
637 try:
638 # If sys.meta_path is empty, calling import_module will issue
639 # a warning and raise ModuleNotFoundError. To avoid the
640 # warning, we check sys.meta_path explicitly and raise the error
641 # ourselves to fall back to creating a dummy module.
642 if not sys.meta_path:
643 raise ModuleNotFoundError
644 importlib.import_module(module_name)
645 except ModuleNotFoundError:
646 module = ModuleType(
647 module_name,
648 doc="Empty module created by pytest's importmode=importlib.",
649 )
650 modules[module_name] = module
651 module_parts.pop(-1)
652 module_name = ".".join(module_parts)
655def resolve_package_path(path: Path) -> Optional[Path]:
656 """Return the Python package path by looking for the last
657 directory upwards which still contains an __init__.py.
659 Returns None if it can not be determined.
660 """
661 result = None
662 for parent in itertools.chain((path,), path.parents):
663 if parent.is_dir():
664 if not parent.joinpath("__init__.py").is_file():
665 break
666 if not parent.name.isidentifier():
667 break
668 result = parent
669 return result
672def visit(
673 path: Union[str, "os.PathLike[str]"], recurse: Callable[["os.DirEntry[str]"], bool]
674) -> Iterator["os.DirEntry[str]"]:
675 """Walk a directory recursively, in breadth-first order.
677 Entries at each directory level are sorted.
678 """
680 # Skip entries with symlink loops and other brokenness, so the caller doesn't
681 # have to deal with it.
682 entries = []
683 for entry in os.scandir(path):
684 try:
685 entry.is_file()
686 except OSError as err:
687 if _ignore_error(err):
688 continue
689 raise
690 entries.append(entry)
692 entries.sort(key=lambda entry: entry.name)
694 yield from entries
696 for entry in entries:
697 if entry.is_dir() and recurse(entry):
698 yield from visit(entry.path, recurse)
701def absolutepath(path: Union[Path, str]) -> Path:
702 """Convert a path to an absolute path using os.path.abspath.
704 Prefer this over Path.resolve() (see #6523).
705 Prefer this over Path.absolute() (not public, doesn't normalize).
706 """
707 return Path(os.path.abspath(str(path)))
710def commonpath(path1: Path, path2: Path) -> Optional[Path]:
711 """Return the common part shared with the other path, or None if there is
712 no common part.
714 If one path is relative and one is absolute, returns None.
715 """
716 try:
717 return Path(os.path.commonpath((str(path1), str(path2))))
718 except ValueError:
719 return None
722def bestrelpath(directory: Path, dest: Path) -> str:
723 """Return a string which is a relative path from directory to dest such
724 that directory/bestrelpath == dest.
726 The paths must be either both absolute or both relative.
728 If no such path can be determined, returns dest.
729 """
730 assert isinstance(directory, Path)
731 assert isinstance(dest, Path)
732 if dest == directory:
733 return os.curdir
734 # Find the longest common directory.
735 base = commonpath(directory, dest)
736 # Can be the case on Windows for two absolute paths on different drives.
737 # Can be the case for two relative paths without common prefix.
738 # Can be the case for a relative path and an absolute path.
739 if not base:
740 return str(dest)
741 reldirectory = directory.relative_to(base)
742 reldest = dest.relative_to(base)
743 return os.path.join(
744 # Back from directory to base.
745 *([os.pardir] * len(reldirectory.parts)),
746 # Forward from base to dest.
747 *reldest.parts,
748 )
751# Originates from py. path.local.copy(), with siginficant trims and adjustments.
752# TODO(py38): Replace with shutil.copytree(..., symlinks=True, dirs_exist_ok=True)
753def copytree(source: Path, target: Path) -> None:
754 """Recursively copy a source directory to target."""
755 assert source.is_dir()
756 for entry in visit(source, recurse=lambda entry: not entry.is_symlink()):
757 x = Path(entry)
758 relpath = x.relative_to(source)
759 newx = target / relpath
760 newx.parent.mkdir(exist_ok=True)
761 if x.is_symlink():
762 newx.symlink_to(os.readlink(x))
763 elif x.is_file():
764 shutil.copyfile(x, newx)
765 elif x.is_dir():
766 newx.mkdir(exist_ok=True)