Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/_pytest/pathlib.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

475 statements  

1from __future__ import annotations 

2 

3import atexit 

4import contextlib 

5from enum import Enum 

6from errno import EBADF 

7from errno import ELOOP 

8from errno import ENOENT 

9from errno import ENOTDIR 

10import fnmatch 

11from functools import partial 

12from importlib.machinery import ModuleSpec 

13from importlib.machinery import PathFinder 

14import importlib.util 

15import itertools 

16import os 

17from os.path import expanduser 

18from os.path import expandvars 

19from os.path import isabs 

20from os.path import sep 

21from pathlib import Path 

22from pathlib import PurePath 

23from posixpath import sep as posix_sep 

24import shutil 

25import sys 

26import types 

27from types import ModuleType 

28from typing import Any 

29from typing import Callable 

30from typing import Iterable 

31from typing import Iterator 

32from typing import TypeVar 

33import uuid 

34import warnings 

35 

36from _pytest.compat import assert_never 

37from _pytest.outcomes import skip 

38from _pytest.warning_types import PytestWarning 

39 

40 

41if sys.version_info < (3, 11): 

42 from importlib._bootstrap_external import _NamespaceLoader as NamespaceLoader 

43else: 

44 from importlib.machinery import NamespaceLoader 

45 

46LOCK_TIMEOUT = 60 * 60 * 24 * 3 

47 

48_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath) 

49 

50# The following function, variables and comments were 

51# copied from cpython 3.9 Lib/pathlib.py file. 

52 

53# EBADF - guard against macOS `stat` throwing EBADF 

54_IGNORED_ERRORS = (ENOENT, ENOTDIR, EBADF, ELOOP) 

55 

56_IGNORED_WINERRORS = ( 

57 21, # ERROR_NOT_READY - drive exists but is not accessible 

58 1921, # ERROR_CANT_RESOLVE_FILENAME - fix for broken symlink pointing to itself 

59) 

60 

61 

62def _ignore_error(exception: Exception) -> bool: 

63 return ( 

64 getattr(exception, "errno", None) in _IGNORED_ERRORS 

65 or getattr(exception, "winerror", None) in _IGNORED_WINERRORS 

66 ) 

67 

68 

69def get_lock_path(path: _AnyPurePath) -> _AnyPurePath: 

70 return path.joinpath(".lock") 

71 

72 

73def on_rm_rf_error( 

74 func: Callable[..., Any] | None, 

75 path: str, 

76 excinfo: BaseException 

77 | tuple[type[BaseException], BaseException, types.TracebackType | None], 

78 *, 

79 start_path: Path, 

80) -> bool: 

81 """Handle known read-only errors during rmtree. 

82 

83 The returned value is used only by our own tests. 

84 """ 

85 if isinstance(excinfo, BaseException): 

86 exc = excinfo 

87 else: 

88 exc = excinfo[1] 

89 

90 # Another process removed the file in the middle of the "rm_rf" (xdist for example). 

91 # More context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018 

92 if isinstance(exc, FileNotFoundError): 

93 return False 

94 

95 if not isinstance(exc, PermissionError): 

96 warnings.warn( 

97 PytestWarning(f"(rm_rf) error removing {path}\n{type(exc)}: {exc}") 

98 ) 

99 return False 

100 

101 if func not in (os.rmdir, os.remove, os.unlink): 

102 if func not in (os.open,): 

103 warnings.warn( 

104 PytestWarning( 

105 f"(rm_rf) unknown function {func} when removing {path}:\n{type(exc)}: {exc}" 

106 ) 

107 ) 

108 return False 

109 

110 # Chmod + retry. 

111 import stat 

112 

113 def chmod_rw(p: str) -> None: 

114 mode = os.stat(p).st_mode 

115 os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR) 

116 

117 # For files, we need to recursively go upwards in the directories to 

118 # ensure they all are also writable. 

119 p = Path(path) 

120 if p.is_file(): 

121 for parent in p.parents: 

122 chmod_rw(str(parent)) 

123 # Stop when we reach the original path passed to rm_rf. 

124 if parent == start_path: 

125 break 

126 chmod_rw(str(path)) 

127 

128 func(path) 

129 return True 

130 

131 

132def ensure_extended_length_path(path: Path) -> Path: 

133 """Get the extended-length version of a path (Windows). 

134 

135 On Windows, by default, the maximum length of a path (MAX_PATH) is 260 

136 characters, and operations on paths longer than that fail. But it is possible 

137 to overcome this by converting the path to "extended-length" form before 

138 performing the operation: 

139 https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation 

140 

141 On Windows, this function returns the extended-length absolute version of path. 

142 On other platforms it returns path unchanged. 

143 """ 

144 if sys.platform.startswith("win32"): 

145 path = path.resolve() 

146 path = Path(get_extended_length_path_str(str(path))) 

147 return path 

148 

149 

150def get_extended_length_path_str(path: str) -> str: 

151 """Convert a path to a Windows extended length path.""" 

152 long_path_prefix = "\\\\?\\" 

153 unc_long_path_prefix = "\\\\?\\UNC\\" 

154 if path.startswith((long_path_prefix, unc_long_path_prefix)): 

155 return path 

156 # UNC 

157 if path.startswith("\\\\"): 

158 return unc_long_path_prefix + path[2:] 

159 return long_path_prefix + path 

160 

161 

162def rm_rf(path: Path) -> None: 

163 """Remove the path contents recursively, even if some elements 

164 are read-only.""" 

165 path = ensure_extended_length_path(path) 

166 onerror = partial(on_rm_rf_error, start_path=path) 

167 if sys.version_info >= (3, 12): 

168 shutil.rmtree(str(path), onexc=onerror) 

169 else: 

170 shutil.rmtree(str(path), onerror=onerror) 

171 

172 

173def find_prefixed(root: Path, prefix: str) -> Iterator[os.DirEntry[str]]: 

174 """Find all elements in root that begin with the prefix, case-insensitive.""" 

175 l_prefix = prefix.lower() 

176 for x in os.scandir(root): 

177 if x.name.lower().startswith(l_prefix): 

178 yield x 

179 

180 

181def extract_suffixes(iter: Iterable[os.DirEntry[str]], prefix: str) -> Iterator[str]: 

182 """Return the parts of the paths following the prefix. 

183 

184 :param iter: Iterator over path names. 

185 :param prefix: Expected prefix of the path names. 

186 """ 

187 p_len = len(prefix) 

188 for entry in iter: 

189 yield entry.name[p_len:] 

190 

191 

192def find_suffixes(root: Path, prefix: str) -> Iterator[str]: 

193 """Combine find_prefixes and extract_suffixes.""" 

194 return extract_suffixes(find_prefixed(root, prefix), prefix) 

195 

196 

197def parse_num(maybe_num: str) -> int: 

198 """Parse number path suffixes, returns -1 on error.""" 

199 try: 

200 return int(maybe_num) 

201 except ValueError: 

202 return -1 

203 

204 

205def _force_symlink(root: Path, target: str | PurePath, link_to: str | Path) -> None: 

206 """Helper to create the current symlink. 

207 

208 It's full of race conditions that are reasonably OK to ignore 

209 for the context of best effort linking to the latest test run. 

210 

211 The presumption being that in case of much parallelism 

212 the inaccuracy is going to be acceptable. 

213 """ 

214 current_symlink = root.joinpath(target) 

215 try: 

216 current_symlink.unlink() 

217 except OSError: 

218 pass 

219 try: 

220 current_symlink.symlink_to(link_to) 

221 except Exception: 

222 pass 

223 

224 

225def make_numbered_dir(root: Path, prefix: str, mode: int = 0o700) -> Path: 

226 """Create a directory with an increased number as suffix for the given prefix.""" 

227 for i in range(10): 

228 # try up to 10 times to create the folder 

229 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) 

230 new_number = max_existing + 1 

231 new_path = root.joinpath(f"{prefix}{new_number}") 

232 try: 

233 new_path.mkdir(mode=mode) 

234 except Exception: 

235 pass 

236 else: 

237 _force_symlink(root, prefix + "current", new_path) 

238 return new_path 

239 else: 

240 raise OSError( 

241 "could not create numbered dir with prefix " 

242 f"{prefix} in {root} after 10 tries" 

243 ) 

244 

245 

246def create_cleanup_lock(p: Path) -> Path: 

247 """Create a lock to prevent premature folder cleanup.""" 

248 lock_path = get_lock_path(p) 

249 try: 

250 fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) 

251 except FileExistsError as e: 

252 raise OSError(f"cannot create lockfile in {p}") from e 

253 else: 

254 pid = os.getpid() 

255 spid = str(pid).encode() 

256 os.write(fd, spid) 

257 os.close(fd) 

258 if not lock_path.is_file(): 

259 raise OSError("lock path got renamed after successful creation") 

260 return lock_path 

261 

262 

263def register_cleanup_lock_removal( 

264 lock_path: Path, register: Any = atexit.register 

265) -> Any: 

266 """Register a cleanup function for removing a lock, by default on atexit.""" 

267 pid = os.getpid() 

268 

269 def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None: 

270 current_pid = os.getpid() 

271 if current_pid != original_pid: 

272 # fork 

273 return 

274 try: 

275 lock_path.unlink() 

276 except OSError: 

277 pass 

278 

279 return register(cleanup_on_exit) 

280 

281 

282def maybe_delete_a_numbered_dir(path: Path) -> None: 

283 """Remove a numbered directory if its lock can be obtained and it does 

284 not seem to be in use.""" 

285 path = ensure_extended_length_path(path) 

286 lock_path = None 

287 try: 

288 lock_path = create_cleanup_lock(path) 

289 parent = path.parent 

290 

291 garbage = parent.joinpath(f"garbage-{uuid.uuid4()}") 

292 path.rename(garbage) 

293 rm_rf(garbage) 

294 except OSError: 

295 # known races: 

296 # * other process did a cleanup at the same time 

297 # * deletable folder was found 

298 # * process cwd (Windows) 

299 return 

300 finally: 

301 # If we created the lock, ensure we remove it even if we failed 

302 # to properly remove the numbered dir. 

303 if lock_path is not None: 

304 try: 

305 lock_path.unlink() 

306 except OSError: 

307 pass 

308 

309 

310def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool: 

311 """Check if `path` is deletable based on whether the lock file is expired.""" 

312 if path.is_symlink(): 

313 return False 

314 lock = get_lock_path(path) 

315 try: 

316 if not lock.is_file(): 

317 return True 

318 except OSError: 

319 # we might not have access to the lock file at all, in this case assume 

320 # we don't have access to the entire directory (#7491). 

321 return False 

322 try: 

323 lock_time = lock.stat().st_mtime 

324 except Exception: 

325 return False 

326 else: 

327 if lock_time < consider_lock_dead_if_created_before: 

328 # We want to ignore any errors while trying to remove the lock such as: 

329 # - PermissionDenied, like the file permissions have changed since the lock creation; 

330 # - FileNotFoundError, in case another pytest process got here first; 

331 # and any other cause of failure. 

332 with contextlib.suppress(OSError): 

333 lock.unlink() 

334 return True 

335 return False 

336 

337 

338def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None: 

339 """Try to cleanup a folder if we can ensure it's deletable.""" 

340 if ensure_deletable(path, consider_lock_dead_if_created_before): 

341 maybe_delete_a_numbered_dir(path) 

342 

343 

344def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]: 

345 """List candidates for numbered directories to be removed - follows py.path.""" 

346 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) 

347 max_delete = max_existing - keep 

348 entries = find_prefixed(root, prefix) 

349 entries, entries2 = itertools.tee(entries) 

350 numbers = map(parse_num, extract_suffixes(entries2, prefix)) 

351 for entry, number in zip(entries, numbers): 

352 if number <= max_delete: 

353 yield Path(entry) 

354 

355 

356def cleanup_dead_symlinks(root: Path) -> None: 

357 for left_dir in root.iterdir(): 

358 if left_dir.is_symlink(): 

359 if not left_dir.resolve().exists(): 

360 left_dir.unlink() 

361 

362 

363def cleanup_numbered_dir( 

364 root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float 

365) -> None: 

366 """Cleanup for lock driven numbered directories.""" 

367 if not root.exists(): 

368 return 

369 for path in cleanup_candidates(root, prefix, keep): 

370 try_cleanup(path, consider_lock_dead_if_created_before) 

371 for path in root.glob("garbage-*"): 

372 try_cleanup(path, consider_lock_dead_if_created_before) 

373 

374 cleanup_dead_symlinks(root) 

375 

376 

377def make_numbered_dir_with_cleanup( 

378 root: Path, 

379 prefix: str, 

380 keep: int, 

381 lock_timeout: float, 

382 mode: int, 

383) -> Path: 

384 """Create a numbered dir with a cleanup lock and remove old ones.""" 

385 e = None 

386 for i in range(10): 

387 try: 

388 p = make_numbered_dir(root, prefix, mode) 

389 # Only lock the current dir when keep is not 0 

390 if keep != 0: 

391 lock_path = create_cleanup_lock(p) 

392 register_cleanup_lock_removal(lock_path) 

393 except Exception as exc: 

394 e = exc 

395 else: 

396 consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout 

397 # Register a cleanup for program exit 

398 atexit.register( 

399 cleanup_numbered_dir, 

400 root, 

401 prefix, 

402 keep, 

403 consider_lock_dead_if_created_before, 

404 ) 

405 return p 

406 assert e is not None 

407 raise e 

408 

409 

410def resolve_from_str(input: str, rootpath: Path) -> Path: 

411 input = expanduser(input) 

412 input = expandvars(input) 

413 if isabs(input): 

414 return Path(input) 

415 else: 

416 return rootpath.joinpath(input) 

417 

418 

419def fnmatch_ex(pattern: str, path: str | os.PathLike[str]) -> bool: 

420 """A port of FNMatcher from py.path.common which works with PurePath() instances. 

421 

422 The difference between this algorithm and PurePath.match() is that the 

423 latter matches "**" glob expressions for each part of the path, while 

424 this algorithm uses the whole path instead. 

425 

426 For example: 

427 "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" 

428 with this algorithm, but not with PurePath.match(). 

429 

430 This algorithm was ported to keep backward-compatibility with existing 

431 settings which assume paths match according this logic. 

432 

433 References: 

434 * https://bugs.python.org/issue29249 

435 * https://bugs.python.org/issue34731 

436 """ 

437 path = PurePath(path) 

438 iswin32 = sys.platform.startswith("win") 

439 

440 if iswin32 and sep not in pattern and posix_sep in pattern: 

441 # Running on Windows, the pattern has no Windows path separators, 

442 # and the pattern has one or more Posix path separators. Replace 

443 # the Posix path separators with the Windows path separator. 

444 pattern = pattern.replace(posix_sep, sep) 

445 

446 if sep not in pattern: 

447 name = path.name 

448 else: 

449 name = str(path) 

450 if path.is_absolute() and not os.path.isabs(pattern): 

451 pattern = f"*{os.sep}{pattern}" 

452 return fnmatch.fnmatch(name, pattern) 

453 

454 

455def parts(s: str) -> set[str]: 

456 parts = s.split(sep) 

457 return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))} 

458 

459 

460def symlink_or_skip( 

461 src: os.PathLike[str] | str, 

462 dst: os.PathLike[str] | str, 

463 **kwargs: Any, 

464) -> None: 

465 """Make a symlink, or skip the test in case symlinks are not supported.""" 

466 try: 

467 os.symlink(src, dst, **kwargs) 

468 except OSError as e: 

469 skip(f"symlinks not supported: {e}") 

470 

471 

472class ImportMode(Enum): 

473 """Possible values for `mode` parameter of `import_path`.""" 

474 

475 prepend = "prepend" 

476 append = "append" 

477 importlib = "importlib" 

478 

479 

480class ImportPathMismatchError(ImportError): 

481 """Raised on import_path() if there is a mismatch of __file__'s. 

482 

483 This can happen when `import_path` is called multiple times with different filenames that has 

484 the same basename but reside in packages 

485 (for example "/tests1/test_foo.py" and "/tests2/test_foo.py"). 

486 """ 

487 

488 

489def import_path( 

490 path: str | os.PathLike[str], 

491 *, 

492 mode: str | ImportMode = ImportMode.prepend, 

493 root: Path, 

494 consider_namespace_packages: bool, 

495) -> ModuleType: 

496 """ 

497 Import and return a module from the given path, which can be a file (a module) or 

498 a directory (a package). 

499 

500 :param path: 

501 Path to the file to import. 

502 

503 :param mode: 

504 Controls the underlying import mechanism that will be used: 

505 

506 * ImportMode.prepend: the directory containing the module (or package, taking 

507 `__init__.py` files into account) will be put at the *start* of `sys.path` before 

508 being imported with `importlib.import_module`. 

509 

510 * ImportMode.append: same as `prepend`, but the directory will be appended 

511 to the end of `sys.path`, if not already in `sys.path`. 

512 

513 * ImportMode.importlib: uses more fine control mechanisms provided by `importlib` 

514 to import the module, which avoids having to muck with `sys.path` at all. It effectively 

515 allows having same-named test modules in different places. 

516 

517 :param root: 

518 Used as an anchor when mode == ImportMode.importlib to obtain 

519 a unique name for the module being imported so it can safely be stored 

520 into ``sys.modules``. 

521 

522 :param consider_namespace_packages: 

523 If True, consider namespace packages when resolving module names. 

524 

525 :raises ImportPathMismatchError: 

526 If after importing the given `path` and the module `__file__` 

527 are different. Only raised in `prepend` and `append` modes. 

528 """ 

529 path = Path(path) 

530 mode = ImportMode(mode) 

531 

532 if not path.exists(): 

533 raise ImportError(path) 

534 

535 if mode is ImportMode.importlib: 

536 # Try to import this module using the standard import mechanisms, but 

537 # without touching sys.path. 

538 try: 

539 pkg_root, module_name = resolve_pkg_root_and_module_name( 

540 path, consider_namespace_packages=consider_namespace_packages 

541 ) 

542 except CouldNotResolvePathError: 

543 pass 

544 else: 

545 # If the given module name is already in sys.modules, do not import it again. 

546 with contextlib.suppress(KeyError): 

547 return sys.modules[module_name] 

548 

549 mod = _import_module_using_spec( 

550 module_name, path, pkg_root, insert_modules=False 

551 ) 

552 if mod is not None: 

553 return mod 

554 

555 # Could not import the module with the current sys.path, so we fall back 

556 # to importing the file as a single module, not being a part of a package. 

557 module_name = module_name_from_path(path, root) 

558 with contextlib.suppress(KeyError): 

559 return sys.modules[module_name] 

560 

561 mod = _import_module_using_spec( 

562 module_name, path, path.parent, insert_modules=True 

563 ) 

564 if mod is None: 

565 raise ImportError(f"Can't find module {module_name} at location {path}") 

566 return mod 

567 

568 try: 

569 pkg_root, module_name = resolve_pkg_root_and_module_name( 

570 path, consider_namespace_packages=consider_namespace_packages 

571 ) 

572 except CouldNotResolvePathError: 

573 pkg_root, module_name = path.parent, path.stem 

574 

575 # Change sys.path permanently: restoring it at the end of this function would cause surprising 

576 # problems because of delayed imports: for example, a conftest.py file imported by this function 

577 # might have local imports, which would fail at runtime if we restored sys.path. 

578 if mode is ImportMode.append: 

579 if str(pkg_root) not in sys.path: 

580 sys.path.append(str(pkg_root)) 

581 elif mode is ImportMode.prepend: 

582 if str(pkg_root) != sys.path[0]: 

583 sys.path.insert(0, str(pkg_root)) 

584 else: 

585 assert_never(mode) 

586 

587 importlib.import_module(module_name) 

588 

589 mod = sys.modules[module_name] 

590 if path.name == "__init__.py": 

591 return mod 

592 

593 ignore = os.environ.get("PY_IGNORE_IMPORTMISMATCH", "") 

594 if ignore != "1": 

595 module_file = mod.__file__ 

596 if module_file is None: 

597 raise ImportPathMismatchError(module_name, module_file, path) 

598 

599 if module_file.endswith((".pyc", ".pyo")): 

600 module_file = module_file[:-1] 

601 if module_file.endswith(os.sep + "__init__.py"): 

602 module_file = module_file[: -(len(os.sep + "__init__.py"))] 

603 

604 try: 

605 is_same = _is_same(str(path), module_file) 

606 except FileNotFoundError: 

607 is_same = False 

608 

609 if not is_same: 

610 raise ImportPathMismatchError(module_name, module_file, path) 

611 

612 return mod 

613 

614 

615def _import_module_using_spec( 

616 module_name: str, module_path: Path, module_location: Path, *, insert_modules: bool 

617) -> ModuleType | None: 

618 """ 

619 Tries to import a module by its canonical name, path, and its parent location. 

620 

621 :param module_name: 

622 The expected module name, will become the key of `sys.modules`. 

623 

624 :param module_path: 

625 The file path of the module, for example `/foo/bar/test_demo.py`. 

626 If module is a package, pass the path to the `__init__.py` of the package. 

627 If module is a namespace package, pass directory path. 

628 

629 :param module_location: 

630 The parent location of the module. 

631 If module is a package, pass the directory containing the `__init__.py` file. 

632 

633 :param insert_modules: 

634 If True, will call `insert_missing_modules` to create empty intermediate modules 

635 with made-up module names (when importing test files not reachable from `sys.path`). 

636 

637 Example 1 of parent_module_*: 

638 

639 module_name: "a.b.c.demo" 

640 module_path: Path("a/b/c/demo.py") 

641 module_location: Path("a/b/c/") 

642 if "a.b.c" is package ("a/b/c/__init__.py" exists), then 

643 parent_module_name: "a.b.c" 

644 parent_module_path: Path("a/b/c/__init__.py") 

645 parent_module_location: Path("a/b/c/") 

646 else: 

647 parent_module_name: "a.b.c" 

648 parent_module_path: Path("a/b/c") 

649 parent_module_location: Path("a/b/") 

650 

651 Example 2 of parent_module_*: 

652 

653 module_name: "a.b.c" 

654 module_path: Path("a/b/c/__init__.py") 

655 module_location: Path("a/b/c/") 

656 if "a.b" is package ("a/b/__init__.py" exists), then 

657 parent_module_name: "a.b" 

658 parent_module_path: Path("a/b/__init__.py") 

659 parent_module_location: Path("a/b/") 

660 else: 

661 parent_module_name: "a.b" 

662 parent_module_path: Path("a/b/") 

663 parent_module_location: Path("a/") 

664 """ 

665 # Attempt to import the parent module, seems is our responsibility: 

666 # https://github.com/python/cpython/blob/73906d5c908c1e0b73c5436faeff7d93698fc074/Lib/importlib/_bootstrap.py#L1308-L1311 

667 parent_module_name, _, name = module_name.rpartition(".") 

668 parent_module: ModuleType | None = None 

669 if parent_module_name: 

670 parent_module = sys.modules.get(parent_module_name) 

671 if parent_module is None: 

672 # Get parent_location based on location, get parent_path based on path. 

673 if module_path.name == "__init__.py": 

674 # If the current module is in a package, 

675 # need to leave the package first and then enter the parent module. 

676 parent_module_path = module_path.parent.parent 

677 else: 

678 parent_module_path = module_path.parent 

679 

680 if (parent_module_path / "__init__.py").is_file(): 

681 # If the parent module is a package, loading by __init__.py file. 

682 parent_module_path = parent_module_path / "__init__.py" 

683 

684 parent_module = _import_module_using_spec( 

685 parent_module_name, 

686 parent_module_path, 

687 parent_module_path.parent, 

688 insert_modules=insert_modules, 

689 ) 

690 

691 # Checking with sys.meta_path first in case one of its hooks can import this module, 

692 # such as our own assertion-rewrite hook. 

693 for meta_importer in sys.meta_path: 

694 spec = meta_importer.find_spec( 

695 module_name, [str(module_location), str(module_path)] 

696 ) 

697 if spec_matches_module_path(spec, module_path): 

698 break 

699 else: 

700 loader = None 

701 if module_path.is_dir(): 

702 # The `spec_from_file_location` matches a loader based on the file extension by default. 

703 # For a namespace package, need to manually specify a loader. 

704 loader = NamespaceLoader(name, module_path, PathFinder()) 

705 

706 spec = importlib.util.spec_from_file_location( 

707 module_name, str(module_path), loader=loader 

708 ) 

709 

710 if spec_matches_module_path(spec, module_path): 

711 assert spec is not None 

712 # Find spec and import this module. 

713 mod = importlib.util.module_from_spec(spec) 

714 sys.modules[module_name] = mod 

715 spec.loader.exec_module(mod) # type: ignore[union-attr] 

716 

717 # Set this module as an attribute of the parent module (#12194). 

718 if parent_module is not None: 

719 setattr(parent_module, name, mod) 

720 

721 if insert_modules: 

722 insert_missing_modules(sys.modules, module_name) 

723 return mod 

724 

725 return None 

726 

727 

728def spec_matches_module_path(module_spec: ModuleSpec | None, module_path: Path) -> bool: 

729 """Return true if the given ModuleSpec can be used to import the given module path.""" 

730 if module_spec is None: 

731 return False 

732 

733 if module_spec.origin: 

734 return Path(module_spec.origin) == module_path 

735 

736 # Compare the path with the `module_spec.submodule_Search_Locations` in case 

737 # the module is part of a namespace package. 

738 # https://docs.python.org/3/library/importlib.html#importlib.machinery.ModuleSpec.submodule_search_locations 

739 if module_spec.submodule_search_locations: # can be None. 

740 for path in module_spec.submodule_search_locations: 

741 if Path(path) == module_path: 

742 return True 

743 

744 return False 

745 

746 

747# Implement a special _is_same function on Windows which returns True if the two filenames 

748# compare equal, to circumvent os.path.samefile returning False for mounts in UNC (#7678). 

749if sys.platform.startswith("win"): 

750 

751 def _is_same(f1: str, f2: str) -> bool: 

752 return Path(f1) == Path(f2) or os.path.samefile(f1, f2) 

753 

754else: 

755 

756 def _is_same(f1: str, f2: str) -> bool: 

757 return os.path.samefile(f1, f2) 

758 

759 

760def module_name_from_path(path: Path, root: Path) -> str: 

761 """ 

762 Return a dotted module name based on the given path, anchored on root. 

763 

764 For example: path="projects/src/tests/test_foo.py" and root="/projects", the 

765 resulting module name will be "src.tests.test_foo". 

766 """ 

767 path = path.with_suffix("") 

768 try: 

769 relative_path = path.relative_to(root) 

770 except ValueError: 

771 # If we can't get a relative path to root, use the full path, except 

772 # for the first part ("d:\\" or "/" depending on the platform, for example). 

773 path_parts = path.parts[1:] 

774 else: 

775 # Use the parts for the relative path to the root path. 

776 path_parts = relative_path.parts 

777 

778 # Module name for packages do not contain the __init__ file, unless 

779 # the `__init__.py` file is at the root. 

780 if len(path_parts) >= 2 and path_parts[-1] == "__init__": 

781 path_parts = path_parts[:-1] 

782 

783 # Module names cannot contain ".", normalize them to "_". This prevents 

784 # a directory having a "." in the name (".env.310" for example) causing extra intermediate modules. 

785 # Also, important to replace "." at the start of paths, as those are considered relative imports. 

786 path_parts = tuple(x.replace(".", "_") for x in path_parts) 

787 

788 return ".".join(path_parts) 

789 

790 

791def insert_missing_modules(modules: dict[str, ModuleType], module_name: str) -> None: 

792 """ 

793 Used by ``import_path`` to create intermediate modules when using mode=importlib. 

794 

795 When we want to import a module as "src.tests.test_foo" for example, we need 

796 to create empty modules "src" and "src.tests" after inserting "src.tests.test_foo", 

797 otherwise "src.tests.test_foo" is not importable by ``__import__``. 

798 """ 

799 module_parts = module_name.split(".") 

800 while module_name: 

801 parent_module_name, _, child_name = module_name.rpartition(".") 

802 if parent_module_name: 

803 parent_module = modules.get(parent_module_name) 

804 if parent_module is None: 

805 try: 

806 # If sys.meta_path is empty, calling import_module will issue 

807 # a warning and raise ModuleNotFoundError. To avoid the 

808 # warning, we check sys.meta_path explicitly and raise the error 

809 # ourselves to fall back to creating a dummy module. 

810 if not sys.meta_path: 

811 raise ModuleNotFoundError 

812 parent_module = importlib.import_module(parent_module_name) 

813 except ModuleNotFoundError: 

814 parent_module = ModuleType( 

815 module_name, 

816 doc="Empty module created by pytest's importmode=importlib.", 

817 ) 

818 modules[parent_module_name] = parent_module 

819 

820 # Add child attribute to the parent that can reference the child 

821 # modules. 

822 if not hasattr(parent_module, child_name): 

823 setattr(parent_module, child_name, modules[module_name]) 

824 

825 module_parts.pop(-1) 

826 module_name = ".".join(module_parts) 

827 

828 

829def resolve_package_path(path: Path) -> Path | None: 

830 """Return the Python package path by looking for the last 

831 directory upwards which still contains an __init__.py. 

832 

833 Returns None if it cannot be determined. 

834 """ 

835 result = None 

836 for parent in itertools.chain((path,), path.parents): 

837 if parent.is_dir(): 

838 if not (parent / "__init__.py").is_file(): 

839 break 

840 if not parent.name.isidentifier(): 

841 break 

842 result = parent 

843 return result 

844 

845 

846def resolve_pkg_root_and_module_name( 

847 path: Path, *, consider_namespace_packages: bool = False 

848) -> tuple[Path, str]: 

849 """ 

850 Return the path to the directory of the root package that contains the 

851 given Python file, and its module name: 

852 

853 src/ 

854 app/ 

855 __init__.py 

856 core/ 

857 __init__.py 

858 models.py 

859 

860 Passing the full path to `models.py` will yield Path("src") and "app.core.models". 

861 

862 If consider_namespace_packages is True, then we additionally check upwards in the hierarchy 

863 for namespace packages: 

864 

865 https://packaging.python.org/en/latest/guides/packaging-namespace-packages 

866 

867 Raises CouldNotResolvePathError if the given path does not belong to a package (missing any __init__.py files). 

868 """ 

869 pkg_root: Path | None = None 

870 pkg_path = resolve_package_path(path) 

871 if pkg_path is not None: 

872 pkg_root = pkg_path.parent 

873 if consider_namespace_packages: 

874 start = pkg_root if pkg_root is not None else path.parent 

875 for candidate in (start, *start.parents): 

876 module_name = compute_module_name(candidate, path) 

877 if module_name and is_importable(module_name, path): 

878 # Point the pkg_root to the root of the namespace package. 

879 pkg_root = candidate 

880 break 

881 

882 if pkg_root is not None: 

883 module_name = compute_module_name(pkg_root, path) 

884 if module_name: 

885 return pkg_root, module_name 

886 

887 raise CouldNotResolvePathError(f"Could not resolve for {path}") 

888 

889 

890def is_importable(module_name: str, module_path: Path) -> bool: 

891 """ 

892 Return if the given module path could be imported normally by Python, akin to the user 

893 entering the REPL and importing the corresponding module name directly, and corresponds 

894 to the module_path specified. 

895 

896 :param module_name: 

897 Full module name that we want to check if is importable. 

898 For example, "app.models". 

899 

900 :param module_path: 

901 Full path to the python module/package we want to check if is importable. 

902 For example, "/projects/src/app/models.py". 

903 """ 

904 try: 

905 # Note this is different from what we do in ``_import_module_using_spec``, where we explicitly search through 

906 # sys.meta_path to be able to pass the path of the module that we want to import (``meta_importer.find_spec``). 

907 # Using importlib.util.find_spec() is different, it gives the same results as trying to import 

908 # the module normally in the REPL. 

909 spec = importlib.util.find_spec(module_name) 

910 except (ImportError, ValueError, ImportWarning): 

911 return False 

912 else: 

913 return spec_matches_module_path(spec, module_path) 

914 

915 

916def compute_module_name(root: Path, module_path: Path) -> str | None: 

917 """Compute a module name based on a path and a root anchor.""" 

918 try: 

919 path_without_suffix = module_path.with_suffix("") 

920 except ValueError: 

921 # Empty paths (such as Path.cwd()) might break meta_path hooks (like our own assertion rewriter). 

922 return None 

923 

924 try: 

925 relative = path_without_suffix.relative_to(root) 

926 except ValueError: # pragma: no cover 

927 return None 

928 names = list(relative.parts) 

929 if not names: 

930 return None 

931 if names[-1] == "__init__": 

932 names.pop() 

933 return ".".join(names) 

934 

935 

936class CouldNotResolvePathError(Exception): 

937 """Custom exception raised by resolve_pkg_root_and_module_name.""" 

938 

939 

940def scandir( 

941 path: str | os.PathLike[str], 

942 sort_key: Callable[[os.DirEntry[str]], object] = lambda entry: entry.name, 

943) -> list[os.DirEntry[str]]: 

944 """Scan a directory recursively, in breadth-first order. 

945 

946 The returned entries are sorted according to the given key. 

947 The default is to sort by name. 

948 """ 

949 entries = [] 

950 with os.scandir(path) as s: 

951 # Skip entries with symlink loops and other brokenness, so the caller 

952 # doesn't have to deal with it. 

953 for entry in s: 

954 try: 

955 entry.is_file() 

956 except OSError as err: 

957 if _ignore_error(err): 

958 continue 

959 raise 

960 entries.append(entry) 

961 entries.sort(key=sort_key) # type: ignore[arg-type] 

962 return entries 

963 

964 

965def visit( 

966 path: str | os.PathLike[str], recurse: Callable[[os.DirEntry[str]], bool] 

967) -> Iterator[os.DirEntry[str]]: 

968 """Walk a directory recursively, in breadth-first order. 

969 

970 The `recurse` predicate determines whether a directory is recursed. 

971 

972 Entries at each directory level are sorted. 

973 """ 

974 entries = scandir(path) 

975 yield from entries 

976 for entry in entries: 

977 if entry.is_dir() and recurse(entry): 

978 yield from visit(entry.path, recurse) 

979 

980 

981def absolutepath(path: str | os.PathLike[str]) -> Path: 

982 """Convert a path to an absolute path using os.path.abspath. 

983 

984 Prefer this over Path.resolve() (see #6523). 

985 Prefer this over Path.absolute() (not public, doesn't normalize). 

986 """ 

987 return Path(os.path.abspath(path)) 

988 

989 

990def commonpath(path1: Path, path2: Path) -> Path | None: 

991 """Return the common part shared with the other path, or None if there is 

992 no common part. 

993 

994 If one path is relative and one is absolute, returns None. 

995 """ 

996 try: 

997 return Path(os.path.commonpath((str(path1), str(path2)))) 

998 except ValueError: 

999 return None 

1000 

1001 

1002def bestrelpath(directory: Path, dest: Path) -> str: 

1003 """Return a string which is a relative path from directory to dest such 

1004 that directory/bestrelpath == dest. 

1005 

1006 The paths must be either both absolute or both relative. 

1007 

1008 If no such path can be determined, returns dest. 

1009 """ 

1010 assert isinstance(directory, Path) 

1011 assert isinstance(dest, Path) 

1012 if dest == directory: 

1013 return os.curdir 

1014 # Find the longest common directory. 

1015 base = commonpath(directory, dest) 

1016 # Can be the case on Windows for two absolute paths on different drives. 

1017 # Can be the case for two relative paths without common prefix. 

1018 # Can be the case for a relative path and an absolute path. 

1019 if not base: 

1020 return str(dest) 

1021 reldirectory = directory.relative_to(base) 

1022 reldest = dest.relative_to(base) 

1023 return os.path.join( 

1024 # Back from directory to base. 

1025 *([os.pardir] * len(reldirectory.parts)), 

1026 # Forward from base to dest. 

1027 *reldest.parts, 

1028 ) 

1029 

1030 

1031def safe_exists(p: Path) -> bool: 

1032 """Like Path.exists(), but account for input arguments that might be too long (#11394).""" 

1033 try: 

1034 return p.exists() 

1035 except (ValueError, OSError): 

1036 # ValueError: stat: path too long for Windows 

1037 # OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect 

1038 return False