Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/_pytest/pathlib.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

459 statements  

1from __future__ import annotations 

2 

3import atexit 

4import contextlib 

5from enum import Enum 

6from errno import EBADF 

7from errno import ELOOP 

8from errno import ENOENT 

9from errno import ENOTDIR 

10import fnmatch 

11from functools import partial 

12from importlib.machinery import ModuleSpec 

13import importlib.util 

14import itertools 

15import os 

16from os.path import expanduser 

17from os.path import expandvars 

18from os.path import isabs 

19from os.path import sep 

20from pathlib import Path 

21from pathlib import PurePath 

22from posixpath import sep as posix_sep 

23import shutil 

24import sys 

25import types 

26from types import ModuleType 

27from typing import Any 

28from typing import Callable 

29from typing import Iterable 

30from typing import Iterator 

31from typing import TypeVar 

32import uuid 

33import warnings 

34 

35from _pytest.compat import assert_never 

36from _pytest.outcomes import skip 

37from _pytest.warning_types import PytestWarning 

38 

39 

40LOCK_TIMEOUT = 60 * 60 * 24 * 3 

41 

42 

43_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath) 

44 

45# The following function, variables and comments were 

46# copied from cpython 3.9 Lib/pathlib.py file. 

47 

48# EBADF - guard against macOS `stat` throwing EBADF 

49_IGNORED_ERRORS = (ENOENT, ENOTDIR, EBADF, ELOOP) 

50 

51_IGNORED_WINERRORS = ( 

52 21, # ERROR_NOT_READY - drive exists but is not accessible 

53 1921, # ERROR_CANT_RESOLVE_FILENAME - fix for broken symlink pointing to itself 

54) 

55 

56 

57def _ignore_error(exception: Exception) -> bool: 

58 return ( 

59 getattr(exception, "errno", None) in _IGNORED_ERRORS 

60 or getattr(exception, "winerror", None) in _IGNORED_WINERRORS 

61 ) 

62 

63 

64def get_lock_path(path: _AnyPurePath) -> _AnyPurePath: 

65 return path.joinpath(".lock") 

66 

67 

68def on_rm_rf_error( 

69 func: Callable[..., Any] | None, 

70 path: str, 

71 excinfo: BaseException 

72 | tuple[type[BaseException], BaseException, types.TracebackType | None], 

73 *, 

74 start_path: Path, 

75) -> bool: 

76 """Handle known read-only errors during rmtree. 

77 

78 The returned value is used only by our own tests. 

79 """ 

80 if isinstance(excinfo, BaseException): 

81 exc = excinfo 

82 else: 

83 exc = excinfo[1] 

84 

85 # Another process removed the file in the middle of the "rm_rf" (xdist for example). 

86 # More context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018 

87 if isinstance(exc, FileNotFoundError): 

88 return False 

89 

90 if not isinstance(exc, PermissionError): 

91 warnings.warn( 

92 PytestWarning(f"(rm_rf) error removing {path}\n{type(exc)}: {exc}") 

93 ) 

94 return False 

95 

96 if func not in (os.rmdir, os.remove, os.unlink): 

97 if func not in (os.open,): 

98 warnings.warn( 

99 PytestWarning( 

100 f"(rm_rf) unknown function {func} when removing {path}:\n{type(exc)}: {exc}" 

101 ) 

102 ) 

103 return False 

104 

105 # Chmod + retry. 

106 import stat 

107 

108 def chmod_rw(p: str) -> None: 

109 mode = os.stat(p).st_mode 

110 os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR) 

111 

112 # For files, we need to recursively go upwards in the directories to 

113 # ensure they all are also writable. 

114 p = Path(path) 

115 if p.is_file(): 

116 for parent in p.parents: 

117 chmod_rw(str(parent)) 

118 # Stop when we reach the original path passed to rm_rf. 

119 if parent == start_path: 

120 break 

121 chmod_rw(str(path)) 

122 

123 func(path) 

124 return True 

125 

126 

127def ensure_extended_length_path(path: Path) -> Path: 

128 """Get the extended-length version of a path (Windows). 

129 

130 On Windows, by default, the maximum length of a path (MAX_PATH) is 260 

131 characters, and operations on paths longer than that fail. But it is possible 

132 to overcome this by converting the path to "extended-length" form before 

133 performing the operation: 

134 https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation 

135 

136 On Windows, this function returns the extended-length absolute version of path. 

137 On other platforms it returns path unchanged. 

138 """ 

139 if sys.platform.startswith("win32"): 

140 path = path.resolve() 

141 path = Path(get_extended_length_path_str(str(path))) 

142 return path 

143 

144 

145def get_extended_length_path_str(path: str) -> str: 

146 """Convert a path to a Windows extended length path.""" 

147 long_path_prefix = "\\\\?\\" 

148 unc_long_path_prefix = "\\\\?\\UNC\\" 

149 if path.startswith((long_path_prefix, unc_long_path_prefix)): 

150 return path 

151 # UNC 

152 if path.startswith("\\\\"): 

153 return unc_long_path_prefix + path[2:] 

154 return long_path_prefix + path 

155 

156 

157def rm_rf(path: Path) -> None: 

158 """Remove the path contents recursively, even if some elements 

159 are read-only.""" 

160 path = ensure_extended_length_path(path) 

161 onerror = partial(on_rm_rf_error, start_path=path) 

162 if sys.version_info >= (3, 12): 

163 shutil.rmtree(str(path), onexc=onerror) 

164 else: 

165 shutil.rmtree(str(path), onerror=onerror) 

166 

167 

168def find_prefixed(root: Path, prefix: str) -> Iterator[os.DirEntry[str]]: 

169 """Find all elements in root that begin with the prefix, case-insensitive.""" 

170 l_prefix = prefix.lower() 

171 for x in os.scandir(root): 

172 if x.name.lower().startswith(l_prefix): 

173 yield x 

174 

175 

176def extract_suffixes(iter: Iterable[os.DirEntry[str]], prefix: str) -> Iterator[str]: 

177 """Return the parts of the paths following the prefix. 

178 

179 :param iter: Iterator over path names. 

180 :param prefix: Expected prefix of the path names. 

181 """ 

182 p_len = len(prefix) 

183 for entry in iter: 

184 yield entry.name[p_len:] 

185 

186 

187def find_suffixes(root: Path, prefix: str) -> Iterator[str]: 

188 """Combine find_prefixes and extract_suffixes.""" 

189 return extract_suffixes(find_prefixed(root, prefix), prefix) 

190 

191 

192def parse_num(maybe_num: str) -> int: 

193 """Parse number path suffixes, returns -1 on error.""" 

194 try: 

195 return int(maybe_num) 

196 except ValueError: 

197 return -1 

198 

199 

200def _force_symlink(root: Path, target: str | PurePath, link_to: str | Path) -> None: 

201 """Helper to create the current symlink. 

202 

203 It's full of race conditions that are reasonably OK to ignore 

204 for the context of best effort linking to the latest test run. 

205 

206 The presumption being that in case of much parallelism 

207 the inaccuracy is going to be acceptable. 

208 """ 

209 current_symlink = root.joinpath(target) 

210 try: 

211 current_symlink.unlink() 

212 except OSError: 

213 pass 

214 try: 

215 current_symlink.symlink_to(link_to) 

216 except Exception: 

217 pass 

218 

219 

220def make_numbered_dir(root: Path, prefix: str, mode: int = 0o700) -> Path: 

221 """Create a directory with an increased number as suffix for the given prefix.""" 

222 for i in range(10): 

223 # try up to 10 times to create the folder 

224 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) 

225 new_number = max_existing + 1 

226 new_path = root.joinpath(f"{prefix}{new_number}") 

227 try: 

228 new_path.mkdir(mode=mode) 

229 except Exception: 

230 pass 

231 else: 

232 _force_symlink(root, prefix + "current", new_path) 

233 return new_path 

234 else: 

235 raise OSError( 

236 "could not create numbered dir with prefix " 

237 f"{prefix} in {root} after 10 tries" 

238 ) 

239 

240 

241def create_cleanup_lock(p: Path) -> Path: 

242 """Create a lock to prevent premature folder cleanup.""" 

243 lock_path = get_lock_path(p) 

244 try: 

245 fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) 

246 except FileExistsError as e: 

247 raise OSError(f"cannot create lockfile in {p}") from e 

248 else: 

249 pid = os.getpid() 

250 spid = str(pid).encode() 

251 os.write(fd, spid) 

252 os.close(fd) 

253 if not lock_path.is_file(): 

254 raise OSError("lock path got renamed after successful creation") 

255 return lock_path 

256 

257 

258def register_cleanup_lock_removal( 

259 lock_path: Path, register: Any = atexit.register 

260) -> Any: 

261 """Register a cleanup function for removing a lock, by default on atexit.""" 

262 pid = os.getpid() 

263 

264 def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None: 

265 current_pid = os.getpid() 

266 if current_pid != original_pid: 

267 # fork 

268 return 

269 try: 

270 lock_path.unlink() 

271 except OSError: 

272 pass 

273 

274 return register(cleanup_on_exit) 

275 

276 

277def maybe_delete_a_numbered_dir(path: Path) -> None: 

278 """Remove a numbered directory if its lock can be obtained and it does 

279 not seem to be in use.""" 

280 path = ensure_extended_length_path(path) 

281 lock_path = None 

282 try: 

283 lock_path = create_cleanup_lock(path) 

284 parent = path.parent 

285 

286 garbage = parent.joinpath(f"garbage-{uuid.uuid4()}") 

287 path.rename(garbage) 

288 rm_rf(garbage) 

289 except OSError: 

290 # known races: 

291 # * other process did a cleanup at the same time 

292 # * deletable folder was found 

293 # * process cwd (Windows) 

294 return 

295 finally: 

296 # If we created the lock, ensure we remove it even if we failed 

297 # to properly remove the numbered dir. 

298 if lock_path is not None: 

299 try: 

300 lock_path.unlink() 

301 except OSError: 

302 pass 

303 

304 

305def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool: 

306 """Check if `path` is deletable based on whether the lock file is expired.""" 

307 if path.is_symlink(): 

308 return False 

309 lock = get_lock_path(path) 

310 try: 

311 if not lock.is_file(): 

312 return True 

313 except OSError: 

314 # we might not have access to the lock file at all, in this case assume 

315 # we don't have access to the entire directory (#7491). 

316 return False 

317 try: 

318 lock_time = lock.stat().st_mtime 

319 except Exception: 

320 return False 

321 else: 

322 if lock_time < consider_lock_dead_if_created_before: 

323 # We want to ignore any errors while trying to remove the lock such as: 

324 # - PermissionDenied, like the file permissions have changed since the lock creation; 

325 # - FileNotFoundError, in case another pytest process got here first; 

326 # and any other cause of failure. 

327 with contextlib.suppress(OSError): 

328 lock.unlink() 

329 return True 

330 return False 

331 

332 

333def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None: 

334 """Try to cleanup a folder if we can ensure it's deletable.""" 

335 if ensure_deletable(path, consider_lock_dead_if_created_before): 

336 maybe_delete_a_numbered_dir(path) 

337 

338 

339def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]: 

340 """List candidates for numbered directories to be removed - follows py.path.""" 

341 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) 

342 max_delete = max_existing - keep 

343 entries = find_prefixed(root, prefix) 

344 entries, entries2 = itertools.tee(entries) 

345 numbers = map(parse_num, extract_suffixes(entries2, prefix)) 

346 for entry, number in zip(entries, numbers): 

347 if number <= max_delete: 

348 yield Path(entry) 

349 

350 

351def cleanup_dead_symlinks(root: Path) -> None: 

352 for left_dir in root.iterdir(): 

353 if left_dir.is_symlink(): 

354 if not left_dir.resolve().exists(): 

355 left_dir.unlink() 

356 

357 

358def cleanup_numbered_dir( 

359 root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float 

360) -> None: 

361 """Cleanup for lock driven numbered directories.""" 

362 if not root.exists(): 

363 return 

364 for path in cleanup_candidates(root, prefix, keep): 

365 try_cleanup(path, consider_lock_dead_if_created_before) 

366 for path in root.glob("garbage-*"): 

367 try_cleanup(path, consider_lock_dead_if_created_before) 

368 

369 cleanup_dead_symlinks(root) 

370 

371 

372def make_numbered_dir_with_cleanup( 

373 root: Path, 

374 prefix: str, 

375 keep: int, 

376 lock_timeout: float, 

377 mode: int, 

378) -> Path: 

379 """Create a numbered dir with a cleanup lock and remove old ones.""" 

380 e = None 

381 for i in range(10): 

382 try: 

383 p = make_numbered_dir(root, prefix, mode) 

384 # Only lock the current dir when keep is not 0 

385 if keep != 0: 

386 lock_path = create_cleanup_lock(p) 

387 register_cleanup_lock_removal(lock_path) 

388 except Exception as exc: 

389 e = exc 

390 else: 

391 consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout 

392 # Register a cleanup for program exit 

393 atexit.register( 

394 cleanup_numbered_dir, 

395 root, 

396 prefix, 

397 keep, 

398 consider_lock_dead_if_created_before, 

399 ) 

400 return p 

401 assert e is not None 

402 raise e 

403 

404 

405def resolve_from_str(input: str, rootpath: Path) -> Path: 

406 input = expanduser(input) 

407 input = expandvars(input) 

408 if isabs(input): 

409 return Path(input) 

410 else: 

411 return rootpath.joinpath(input) 

412 

413 

414def fnmatch_ex(pattern: str, path: str | os.PathLike[str]) -> bool: 

415 """A port of FNMatcher from py.path.common which works with PurePath() instances. 

416 

417 The difference between this algorithm and PurePath.match() is that the 

418 latter matches "**" glob expressions for each part of the path, while 

419 this algorithm uses the whole path instead. 

420 

421 For example: 

422 "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" 

423 with this algorithm, but not with PurePath.match(). 

424 

425 This algorithm was ported to keep backward-compatibility with existing 

426 settings which assume paths match according this logic. 

427 

428 References: 

429 * https://bugs.python.org/issue29249 

430 * https://bugs.python.org/issue34731 

431 """ 

432 path = PurePath(path) 

433 iswin32 = sys.platform.startswith("win") 

434 

435 if iswin32 and sep not in pattern and posix_sep in pattern: 

436 # Running on Windows, the pattern has no Windows path separators, 

437 # and the pattern has one or more Posix path separators. Replace 

438 # the Posix path separators with the Windows path separator. 

439 pattern = pattern.replace(posix_sep, sep) 

440 

441 if sep not in pattern: 

442 name = path.name 

443 else: 

444 name = str(path) 

445 if path.is_absolute() and not os.path.isabs(pattern): 

446 pattern = f"*{os.sep}{pattern}" 

447 return fnmatch.fnmatch(name, pattern) 

448 

449 

450def parts(s: str) -> set[str]: 

451 parts = s.split(sep) 

452 return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))} 

453 

454 

455def symlink_or_skip( 

456 src: os.PathLike[str] | str, 

457 dst: os.PathLike[str] | str, 

458 **kwargs: Any, 

459) -> None: 

460 """Make a symlink, or skip the test in case symlinks are not supported.""" 

461 try: 

462 os.symlink(src, dst, **kwargs) 

463 except OSError as e: 

464 skip(f"symlinks not supported: {e}") 

465 

466 

467class ImportMode(Enum): 

468 """Possible values for `mode` parameter of `import_path`.""" 

469 

470 prepend = "prepend" 

471 append = "append" 

472 importlib = "importlib" 

473 

474 

475class ImportPathMismatchError(ImportError): 

476 """Raised on import_path() if there is a mismatch of __file__'s. 

477 

478 This can happen when `import_path` is called multiple times with different filenames that has 

479 the same basename but reside in packages 

480 (for example "/tests1/test_foo.py" and "/tests2/test_foo.py"). 

481 """ 

482 

483 

484def import_path( 

485 path: str | os.PathLike[str], 

486 *, 

487 mode: str | ImportMode = ImportMode.prepend, 

488 root: Path, 

489 consider_namespace_packages: bool, 

490) -> ModuleType: 

491 """ 

492 Import and return a module from the given path, which can be a file (a module) or 

493 a directory (a package). 

494 

495 :param path: 

496 Path to the file to import. 

497 

498 :param mode: 

499 Controls the underlying import mechanism that will be used: 

500 

501 * ImportMode.prepend: the directory containing the module (or package, taking 

502 `__init__.py` files into account) will be put at the *start* of `sys.path` before 

503 being imported with `importlib.import_module`. 

504 

505 * ImportMode.append: same as `prepend`, but the directory will be appended 

506 to the end of `sys.path`, if not already in `sys.path`. 

507 

508 * ImportMode.importlib: uses more fine control mechanisms provided by `importlib` 

509 to import the module, which avoids having to muck with `sys.path` at all. It effectively 

510 allows having same-named test modules in different places. 

511 

512 :param root: 

513 Used as an anchor when mode == ImportMode.importlib to obtain 

514 a unique name for the module being imported so it can safely be stored 

515 into ``sys.modules``. 

516 

517 :param consider_namespace_packages: 

518 If True, consider namespace packages when resolving module names. 

519 

520 :raises ImportPathMismatchError: 

521 If after importing the given `path` and the module `__file__` 

522 are different. Only raised in `prepend` and `append` modes. 

523 """ 

524 path = Path(path) 

525 mode = ImportMode(mode) 

526 

527 if not path.exists(): 

528 raise ImportError(path) 

529 

530 if mode is ImportMode.importlib: 

531 # Try to import this module using the standard import mechanisms, but 

532 # without touching sys.path. 

533 try: 

534 pkg_root, module_name = resolve_pkg_root_and_module_name( 

535 path, consider_namespace_packages=consider_namespace_packages 

536 ) 

537 except CouldNotResolvePathError: 

538 pass 

539 else: 

540 # If the given module name is already in sys.modules, do not import it again. 

541 with contextlib.suppress(KeyError): 

542 return sys.modules[module_name] 

543 

544 mod = _import_module_using_spec( 

545 module_name, path, pkg_root, insert_modules=False 

546 ) 

547 if mod is not None: 

548 return mod 

549 

550 # Could not import the module with the current sys.path, so we fall back 

551 # to importing the file as a single module, not being a part of a package. 

552 module_name = module_name_from_path(path, root) 

553 with contextlib.suppress(KeyError): 

554 return sys.modules[module_name] 

555 

556 mod = _import_module_using_spec( 

557 module_name, path, path.parent, insert_modules=True 

558 ) 

559 if mod is None: 

560 raise ImportError(f"Can't find module {module_name} at location {path}") 

561 return mod 

562 

563 try: 

564 pkg_root, module_name = resolve_pkg_root_and_module_name( 

565 path, consider_namespace_packages=consider_namespace_packages 

566 ) 

567 except CouldNotResolvePathError: 

568 pkg_root, module_name = path.parent, path.stem 

569 

570 # Change sys.path permanently: restoring it at the end of this function would cause surprising 

571 # problems because of delayed imports: for example, a conftest.py file imported by this function 

572 # might have local imports, which would fail at runtime if we restored sys.path. 

573 if mode is ImportMode.append: 

574 if str(pkg_root) not in sys.path: 

575 sys.path.append(str(pkg_root)) 

576 elif mode is ImportMode.prepend: 

577 if str(pkg_root) != sys.path[0]: 

578 sys.path.insert(0, str(pkg_root)) 

579 else: 

580 assert_never(mode) 

581 

582 importlib.import_module(module_name) 

583 

584 mod = sys.modules[module_name] 

585 if path.name == "__init__.py": 

586 return mod 

587 

588 ignore = os.environ.get("PY_IGNORE_IMPORTMISMATCH", "") 

589 if ignore != "1": 

590 module_file = mod.__file__ 

591 if module_file is None: 

592 raise ImportPathMismatchError(module_name, module_file, path) 

593 

594 if module_file.endswith((".pyc", ".pyo")): 

595 module_file = module_file[:-1] 

596 if module_file.endswith(os.sep + "__init__.py"): 

597 module_file = module_file[: -(len(os.sep + "__init__.py"))] 

598 

599 try: 

600 is_same = _is_same(str(path), module_file) 

601 except FileNotFoundError: 

602 is_same = False 

603 

604 if not is_same: 

605 raise ImportPathMismatchError(module_name, module_file, path) 

606 

607 return mod 

608 

609 

610def _import_module_using_spec( 

611 module_name: str, module_path: Path, module_location: Path, *, insert_modules: bool 

612) -> ModuleType | None: 

613 """ 

614 Tries to import a module by its canonical name, path to the .py file, and its 

615 parent location. 

616 

617 :param insert_modules: 

618 If True, will call insert_missing_modules to create empty intermediate modules 

619 for made-up module names (when importing test files not reachable from sys.path). 

620 """ 

621 # Checking with sys.meta_path first in case one of its hooks can import this module, 

622 # such as our own assertion-rewrite hook. 

623 for meta_importer in sys.meta_path: 

624 spec = meta_importer.find_spec(module_name, [str(module_location)]) 

625 if spec_matches_module_path(spec, module_path): 

626 break 

627 else: 

628 spec = importlib.util.spec_from_file_location(module_name, str(module_path)) 

629 

630 if spec_matches_module_path(spec, module_path): 

631 assert spec is not None 

632 # Attempt to import the parent module, seems is our responsibility: 

633 # https://github.com/python/cpython/blob/73906d5c908c1e0b73c5436faeff7d93698fc074/Lib/importlib/_bootstrap.py#L1308-L1311 

634 parent_module_name, _, name = module_name.rpartition(".") 

635 parent_module: ModuleType | None = None 

636 if parent_module_name: 

637 parent_module = sys.modules.get(parent_module_name) 

638 if parent_module is None: 

639 # Find the directory of this module's parent. 

640 parent_dir = ( 

641 module_path.parent.parent 

642 if module_path.name == "__init__.py" 

643 else module_path.parent 

644 ) 

645 # Consider the parent module path as its __init__.py file, if it has one. 

646 parent_module_path = ( 

647 parent_dir / "__init__.py" 

648 if (parent_dir / "__init__.py").is_file() 

649 else parent_dir 

650 ) 

651 parent_module = _import_module_using_spec( 

652 parent_module_name, 

653 parent_module_path, 

654 parent_dir, 

655 insert_modules=insert_modules, 

656 ) 

657 

658 # Find spec and import this module. 

659 mod = importlib.util.module_from_spec(spec) 

660 sys.modules[module_name] = mod 

661 spec.loader.exec_module(mod) # type: ignore[union-attr] 

662 

663 # Set this module as an attribute of the parent module (#12194). 

664 if parent_module is not None: 

665 setattr(parent_module, name, mod) 

666 

667 if insert_modules: 

668 insert_missing_modules(sys.modules, module_name) 

669 return mod 

670 

671 return None 

672 

673 

674def spec_matches_module_path(module_spec: ModuleSpec | None, module_path: Path) -> bool: 

675 """Return true if the given ModuleSpec can be used to import the given module path.""" 

676 if module_spec is None or module_spec.origin is None: 

677 return False 

678 

679 return Path(module_spec.origin) == module_path 

680 

681 

682# Implement a special _is_same function on Windows which returns True if the two filenames 

683# compare equal, to circumvent os.path.samefile returning False for mounts in UNC (#7678). 

684if sys.platform.startswith("win"): 

685 

686 def _is_same(f1: str, f2: str) -> bool: 

687 return Path(f1) == Path(f2) or os.path.samefile(f1, f2) 

688 

689else: 

690 

691 def _is_same(f1: str, f2: str) -> bool: 

692 return os.path.samefile(f1, f2) 

693 

694 

695def module_name_from_path(path: Path, root: Path) -> str: 

696 """ 

697 Return a dotted module name based on the given path, anchored on root. 

698 

699 For example: path="projects/src/tests/test_foo.py" and root="/projects", the 

700 resulting module name will be "src.tests.test_foo". 

701 """ 

702 path = path.with_suffix("") 

703 try: 

704 relative_path = path.relative_to(root) 

705 except ValueError: 

706 # If we can't get a relative path to root, use the full path, except 

707 # for the first part ("d:\\" or "/" depending on the platform, for example). 

708 path_parts = path.parts[1:] 

709 else: 

710 # Use the parts for the relative path to the root path. 

711 path_parts = relative_path.parts 

712 

713 # Module name for packages do not contain the __init__ file, unless 

714 # the `__init__.py` file is at the root. 

715 if len(path_parts) >= 2 and path_parts[-1] == "__init__": 

716 path_parts = path_parts[:-1] 

717 

718 # Module names cannot contain ".", normalize them to "_". This prevents 

719 # a directory having a "." in the name (".env.310" for example) causing extra intermediate modules. 

720 # Also, important to replace "." at the start of paths, as those are considered relative imports. 

721 path_parts = tuple(x.replace(".", "_") for x in path_parts) 

722 

723 return ".".join(path_parts) 

724 

725 

726def insert_missing_modules(modules: dict[str, ModuleType], module_name: str) -> None: 

727 """ 

728 Used by ``import_path`` to create intermediate modules when using mode=importlib. 

729 

730 When we want to import a module as "src.tests.test_foo" for example, we need 

731 to create empty modules "src" and "src.tests" after inserting "src.tests.test_foo", 

732 otherwise "src.tests.test_foo" is not importable by ``__import__``. 

733 """ 

734 module_parts = module_name.split(".") 

735 while module_name: 

736 parent_module_name, _, child_name = module_name.rpartition(".") 

737 if parent_module_name: 

738 parent_module = modules.get(parent_module_name) 

739 if parent_module is None: 

740 try: 

741 # If sys.meta_path is empty, calling import_module will issue 

742 # a warning and raise ModuleNotFoundError. To avoid the 

743 # warning, we check sys.meta_path explicitly and raise the error 

744 # ourselves to fall back to creating a dummy module. 

745 if not sys.meta_path: 

746 raise ModuleNotFoundError 

747 parent_module = importlib.import_module(parent_module_name) 

748 except ModuleNotFoundError: 

749 parent_module = ModuleType( 

750 module_name, 

751 doc="Empty module created by pytest's importmode=importlib.", 

752 ) 

753 modules[parent_module_name] = parent_module 

754 

755 # Add child attribute to the parent that can reference the child 

756 # modules. 

757 if not hasattr(parent_module, child_name): 

758 setattr(parent_module, child_name, modules[module_name]) 

759 

760 module_parts.pop(-1) 

761 module_name = ".".join(module_parts) 

762 

763 

764def resolve_package_path(path: Path) -> Path | None: 

765 """Return the Python package path by looking for the last 

766 directory upwards which still contains an __init__.py. 

767 

768 Returns None if it cannot be determined. 

769 """ 

770 result = None 

771 for parent in itertools.chain((path,), path.parents): 

772 if parent.is_dir(): 

773 if not (parent / "__init__.py").is_file(): 

774 break 

775 if not parent.name.isidentifier(): 

776 break 

777 result = parent 

778 return result 

779 

780 

781def resolve_pkg_root_and_module_name( 

782 path: Path, *, consider_namespace_packages: bool = False 

783) -> tuple[Path, str]: 

784 """ 

785 Return the path to the directory of the root package that contains the 

786 given Python file, and its module name: 

787 

788 src/ 

789 app/ 

790 __init__.py 

791 core/ 

792 __init__.py 

793 models.py 

794 

795 Passing the full path to `models.py` will yield Path("src") and "app.core.models". 

796 

797 If consider_namespace_packages is True, then we additionally check upwards in the hierarchy 

798 for namespace packages: 

799 

800 https://packaging.python.org/en/latest/guides/packaging-namespace-packages 

801 

802 Raises CouldNotResolvePathError if the given path does not belong to a package (missing any __init__.py files). 

803 """ 

804 pkg_root: Path | None = None 

805 pkg_path = resolve_package_path(path) 

806 if pkg_path is not None: 

807 pkg_root = pkg_path.parent 

808 if consider_namespace_packages: 

809 start = pkg_root if pkg_root is not None else path.parent 

810 for candidate in (start, *start.parents): 

811 module_name = compute_module_name(candidate, path) 

812 if module_name and is_importable(module_name, path): 

813 # Point the pkg_root to the root of the namespace package. 

814 pkg_root = candidate 

815 break 

816 

817 if pkg_root is not None: 

818 module_name = compute_module_name(pkg_root, path) 

819 if module_name: 

820 return pkg_root, module_name 

821 

822 raise CouldNotResolvePathError(f"Could not resolve for {path}") 

823 

824 

825def is_importable(module_name: str, module_path: Path) -> bool: 

826 """ 

827 Return if the given module path could be imported normally by Python, akin to the user 

828 entering the REPL and importing the corresponding module name directly, and corresponds 

829 to the module_path specified. 

830 

831 :param module_name: 

832 Full module name that we want to check if is importable. 

833 For example, "app.models". 

834 

835 :param module_path: 

836 Full path to the python module/package we want to check if is importable. 

837 For example, "/projects/src/app/models.py". 

838 """ 

839 try: 

840 # Note this is different from what we do in ``_import_module_using_spec``, where we explicitly search through 

841 # sys.meta_path to be able to pass the path of the module that we want to import (``meta_importer.find_spec``). 

842 # Using importlib.util.find_spec() is different, it gives the same results as trying to import 

843 # the module normally in the REPL. 

844 spec = importlib.util.find_spec(module_name) 

845 except (ImportError, ValueError, ImportWarning): 

846 return False 

847 else: 

848 return spec_matches_module_path(spec, module_path) 

849 

850 

851def compute_module_name(root: Path, module_path: Path) -> str | None: 

852 """Compute a module name based on a path and a root anchor.""" 

853 try: 

854 path_without_suffix = module_path.with_suffix("") 

855 except ValueError: 

856 # Empty paths (such as Path.cwd()) might break meta_path hooks (like our own assertion rewriter). 

857 return None 

858 

859 try: 

860 relative = path_without_suffix.relative_to(root) 

861 except ValueError: # pragma: no cover 

862 return None 

863 names = list(relative.parts) 

864 if not names: 

865 return None 

866 if names[-1] == "__init__": 

867 names.pop() 

868 return ".".join(names) 

869 

870 

871class CouldNotResolvePathError(Exception): 

872 """Custom exception raised by resolve_pkg_root_and_module_name.""" 

873 

874 

875def scandir( 

876 path: str | os.PathLike[str], 

877 sort_key: Callable[[os.DirEntry[str]], object] = lambda entry: entry.name, 

878) -> list[os.DirEntry[str]]: 

879 """Scan a directory recursively, in breadth-first order. 

880 

881 The returned entries are sorted according to the given key. 

882 The default is to sort by name. 

883 """ 

884 entries = [] 

885 with os.scandir(path) as s: 

886 # Skip entries with symlink loops and other brokenness, so the caller 

887 # doesn't have to deal with it. 

888 for entry in s: 

889 try: 

890 entry.is_file() 

891 except OSError as err: 

892 if _ignore_error(err): 

893 continue 

894 raise 

895 entries.append(entry) 

896 entries.sort(key=sort_key) # type: ignore[arg-type] 

897 return entries 

898 

899 

900def visit( 

901 path: str | os.PathLike[str], recurse: Callable[[os.DirEntry[str]], bool] 

902) -> Iterator[os.DirEntry[str]]: 

903 """Walk a directory recursively, in breadth-first order. 

904 

905 The `recurse` predicate determines whether a directory is recursed. 

906 

907 Entries at each directory level are sorted. 

908 """ 

909 entries = scandir(path) 

910 yield from entries 

911 for entry in entries: 

912 if entry.is_dir() and recurse(entry): 

913 yield from visit(entry.path, recurse) 

914 

915 

916def absolutepath(path: str | os.PathLike[str]) -> Path: 

917 """Convert a path to an absolute path using os.path.abspath. 

918 

919 Prefer this over Path.resolve() (see #6523). 

920 Prefer this over Path.absolute() (not public, doesn't normalize). 

921 """ 

922 return Path(os.path.abspath(path)) 

923 

924 

925def commonpath(path1: Path, path2: Path) -> Path | None: 

926 """Return the common part shared with the other path, or None if there is 

927 no common part. 

928 

929 If one path is relative and one is absolute, returns None. 

930 """ 

931 try: 

932 return Path(os.path.commonpath((str(path1), str(path2)))) 

933 except ValueError: 

934 return None 

935 

936 

937def bestrelpath(directory: Path, dest: Path) -> str: 

938 """Return a string which is a relative path from directory to dest such 

939 that directory/bestrelpath == dest. 

940 

941 The paths must be either both absolute or both relative. 

942 

943 If no such path can be determined, returns dest. 

944 """ 

945 assert isinstance(directory, Path) 

946 assert isinstance(dest, Path) 

947 if dest == directory: 

948 return os.curdir 

949 # Find the longest common directory. 

950 base = commonpath(directory, dest) 

951 # Can be the case on Windows for two absolute paths on different drives. 

952 # Can be the case for two relative paths without common prefix. 

953 # Can be the case for a relative path and an absolute path. 

954 if not base: 

955 return str(dest) 

956 reldirectory = directory.relative_to(base) 

957 reldest = dest.relative_to(base) 

958 return os.path.join( 

959 # Back from directory to base. 

960 *([os.pardir] * len(reldirectory.parts)), 

961 # Forward from base to dest. 

962 *reldest.parts, 

963 ) 

964 

965 

966def safe_exists(p: Path) -> bool: 

967 """Like Path.exists(), but account for input arguments that might be too long (#11394).""" 

968 try: 

969 return p.exists() 

970 except (ValueError, OSError): 

971 # ValueError: stat: path too long for Windows 

972 # OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect 

973 return False