Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/_pytest/pathlib.py: 22%

416 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 07:17 +0000

1import atexit 

2import contextlib 

3import fnmatch 

4import importlib.util 

5import itertools 

6import os 

7import shutil 

8import sys 

9import types 

10import uuid 

11import warnings 

12from enum import Enum 

13from errno import EBADF 

14from errno import ELOOP 

15from errno import ENOENT 

16from errno import ENOTDIR 

17from functools import partial 

18from os.path import expanduser 

19from os.path import expandvars 

20from os.path import isabs 

21from os.path import sep 

22from pathlib import Path 

23from pathlib import PurePath 

24from posixpath import sep as posix_sep 

25from types import ModuleType 

26from typing import Callable 

27from typing import Dict 

28from typing import Iterable 

29from typing import Iterator 

30from typing import List 

31from typing import Optional 

32from typing import Set 

33from typing import Tuple 

34from typing import Type 

35from typing import TypeVar 

36from typing import Union 

37 

38from _pytest.compat import assert_never 

39from _pytest.outcomes import skip 

40from _pytest.warning_types import PytestWarning 

41 

42LOCK_TIMEOUT = 60 * 60 * 24 * 3 

43 

44 

45_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath) 

46 

47# The following function, variables and comments were 

48# copied from cpython 3.9 Lib/pathlib.py file. 

49 

50# EBADF - guard against macOS `stat` throwing EBADF 

51_IGNORED_ERRORS = (ENOENT, ENOTDIR, EBADF, ELOOP) 

52 

53_IGNORED_WINERRORS = ( 

54 21, # ERROR_NOT_READY - drive exists but is not accessible 

55 1921, # ERROR_CANT_RESOLVE_FILENAME - fix for broken symlink pointing to itself 

56) 

57 

58 

59def _ignore_error(exception): 

60 return ( 

61 getattr(exception, "errno", None) in _IGNORED_ERRORS 

62 or getattr(exception, "winerror", None) in _IGNORED_WINERRORS 

63 ) 

64 

65 

66def get_lock_path(path: _AnyPurePath) -> _AnyPurePath: 

67 return path.joinpath(".lock") 

68 

69 

70def on_rm_rf_error( 

71 func, 

72 path: str, 

73 excinfo: Union[ 

74 BaseException, 

75 Tuple[Type[BaseException], BaseException, Optional[types.TracebackType]], 

76 ], 

77 *, 

78 start_path: Path, 

79) -> bool: 

80 """Handle known read-only errors during rmtree. 

81 

82 The returned value is used only by our own tests. 

83 """ 

84 if isinstance(excinfo, BaseException): 

85 exc = excinfo 

86 else: 

87 exc = excinfo[1] 

88 

89 # Another process removed the file in the middle of the "rm_rf" (xdist for example). 

90 # More context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018 

91 if isinstance(exc, FileNotFoundError): 

92 return False 

93 

94 if not isinstance(exc, PermissionError): 

95 warnings.warn( 

96 PytestWarning(f"(rm_rf) error removing {path}\n{type(exc)}: {exc}") 

97 ) 

98 return False 

99 

100 if func not in (os.rmdir, os.remove, os.unlink): 

101 if func not in (os.open,): 

102 warnings.warn( 

103 PytestWarning( 

104 "(rm_rf) unknown function {} when removing {}:\n{}: {}".format( 

105 func, path, type(exc), exc 

106 ) 

107 ) 

108 ) 

109 return False 

110 

111 # Chmod + retry. 

112 import stat 

113 

114 def chmod_rw(p: str) -> None: 

115 mode = os.stat(p).st_mode 

116 os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR) 

117 

118 # For files, we need to recursively go upwards in the directories to 

119 # ensure they all are also writable. 

120 p = Path(path) 

121 if p.is_file(): 

122 for parent in p.parents: 

123 chmod_rw(str(parent)) 

124 # Stop when we reach the original path passed to rm_rf. 

125 if parent == start_path: 

126 break 

127 chmod_rw(str(path)) 

128 

129 func(path) 

130 return True 

131 

132 

133def ensure_extended_length_path(path: Path) -> Path: 

134 """Get the extended-length version of a path (Windows). 

135 

136 On Windows, by default, the maximum length of a path (MAX_PATH) is 260 

137 characters, and operations on paths longer than that fail. But it is possible 

138 to overcome this by converting the path to "extended-length" form before 

139 performing the operation: 

140 https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation 

141 

142 On Windows, this function returns the extended-length absolute version of path. 

143 On other platforms it returns path unchanged. 

144 """ 

145 if sys.platform.startswith("win32"): 

146 path = path.resolve() 

147 path = Path(get_extended_length_path_str(str(path))) 

148 return path 

149 

150 

151def get_extended_length_path_str(path: str) -> str: 

152 """Convert a path to a Windows extended length path.""" 

153 long_path_prefix = "\\\\?\\" 

154 unc_long_path_prefix = "\\\\?\\UNC\\" 

155 if path.startswith((long_path_prefix, unc_long_path_prefix)): 

156 return path 

157 # UNC 

158 if path.startswith("\\\\"): 

159 return unc_long_path_prefix + path[2:] 

160 return long_path_prefix + path 

161 

162 

163def rm_rf(path: Path) -> None: 

164 """Remove the path contents recursively, even if some elements 

165 are read-only.""" 

166 path = ensure_extended_length_path(path) 

167 onerror = partial(on_rm_rf_error, start_path=path) 

168 if sys.version_info >= (3, 12): 

169 shutil.rmtree(str(path), onexc=onerror) 

170 else: 

171 shutil.rmtree(str(path), onerror=onerror) 

172 

173 

174def find_prefixed(root: Path, prefix: str) -> Iterator[Path]: 

175 """Find all elements in root that begin with the prefix, case insensitive.""" 

176 l_prefix = prefix.lower() 

177 for x in root.iterdir(): 

178 if x.name.lower().startswith(l_prefix): 

179 yield x 

180 

181 

182def extract_suffixes(iter: Iterable[PurePath], prefix: str) -> Iterator[str]: 

183 """Return the parts of the paths following the prefix. 

184 

185 :param iter: Iterator over path names. 

186 :param prefix: Expected prefix of the path names. 

187 """ 

188 p_len = len(prefix) 

189 for p in iter: 

190 yield p.name[p_len:] 

191 

192 

193def find_suffixes(root: Path, prefix: str) -> Iterator[str]: 

194 """Combine find_prefixes and extract_suffixes.""" 

195 return extract_suffixes(find_prefixed(root, prefix), prefix) 

196 

197 

198def parse_num(maybe_num) -> int: 

199 """Parse number path suffixes, returns -1 on error.""" 

200 try: 

201 return int(maybe_num) 

202 except ValueError: 

203 return -1 

204 

205 

206def _force_symlink( 

207 root: Path, target: Union[str, PurePath], link_to: Union[str, Path] 

208) -> None: 

209 """Helper to create the current symlink. 

210 

211 It's full of race conditions that are reasonably OK to ignore 

212 for the context of best effort linking to the latest test run. 

213 

214 The presumption being that in case of much parallelism 

215 the inaccuracy is going to be acceptable. 

216 """ 

217 current_symlink = root.joinpath(target) 

218 try: 

219 current_symlink.unlink() 

220 except OSError: 

221 pass 

222 try: 

223 current_symlink.symlink_to(link_to) 

224 except Exception: 

225 pass 

226 

227 

228def make_numbered_dir(root: Path, prefix: str, mode: int = 0o700) -> Path: 

229 """Create a directory with an increased number as suffix for the given prefix.""" 

230 for i in range(10): 

231 # try up to 10 times to create the folder 

232 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) 

233 new_number = max_existing + 1 

234 new_path = root.joinpath(f"{prefix}{new_number}") 

235 try: 

236 new_path.mkdir(mode=mode) 

237 except Exception: 

238 pass 

239 else: 

240 _force_symlink(root, prefix + "current", new_path) 

241 return new_path 

242 else: 

243 raise OSError( 

244 "could not create numbered dir with prefix " 

245 "{prefix} in {root} after 10 tries".format(prefix=prefix, root=root) 

246 ) 

247 

248 

249def create_cleanup_lock(p: Path) -> Path: 

250 """Create a lock to prevent premature folder cleanup.""" 

251 lock_path = get_lock_path(p) 

252 try: 

253 fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) 

254 except FileExistsError as e: 

255 raise OSError(f"cannot create lockfile in {p}") from e 

256 else: 

257 pid = os.getpid() 

258 spid = str(pid).encode() 

259 os.write(fd, spid) 

260 os.close(fd) 

261 if not lock_path.is_file(): 

262 raise OSError("lock path got renamed after successful creation") 

263 return lock_path 

264 

265 

266def register_cleanup_lock_removal(lock_path: Path, register=atexit.register): 

267 """Register a cleanup function for removing a lock, by default on atexit.""" 

268 pid = os.getpid() 

269 

270 def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None: 

271 current_pid = os.getpid() 

272 if current_pid != original_pid: 

273 # fork 

274 return 

275 try: 

276 lock_path.unlink() 

277 except OSError: 

278 pass 

279 

280 return register(cleanup_on_exit) 

281 

282 

283def maybe_delete_a_numbered_dir(path: Path) -> None: 

284 """Remove a numbered directory if its lock can be obtained and it does 

285 not seem to be in use.""" 

286 path = ensure_extended_length_path(path) 

287 lock_path = None 

288 try: 

289 lock_path = create_cleanup_lock(path) 

290 parent = path.parent 

291 

292 garbage = parent.joinpath(f"garbage-{uuid.uuid4()}") 

293 path.rename(garbage) 

294 rm_rf(garbage) 

295 except OSError: 

296 # known races: 

297 # * other process did a cleanup at the same time 

298 # * deletable folder was found 

299 # * process cwd (Windows) 

300 return 

301 finally: 

302 # If we created the lock, ensure we remove it even if we failed 

303 # to properly remove the numbered dir. 

304 if lock_path is not None: 

305 try: 

306 lock_path.unlink() 

307 except OSError: 

308 pass 

309 

310 

311def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool: 

312 """Check if `path` is deletable based on whether the lock file is expired.""" 

313 if path.is_symlink(): 

314 return False 

315 lock = get_lock_path(path) 

316 try: 

317 if not lock.is_file(): 

318 return True 

319 except OSError: 

320 # we might not have access to the lock file at all, in this case assume 

321 # we don't have access to the entire directory (#7491). 

322 return False 

323 try: 

324 lock_time = lock.stat().st_mtime 

325 except Exception: 

326 return False 

327 else: 

328 if lock_time < consider_lock_dead_if_created_before: 

329 # We want to ignore any errors while trying to remove the lock such as: 

330 # - PermissionDenied, like the file permissions have changed since the lock creation; 

331 # - FileNotFoundError, in case another pytest process got here first; 

332 # and any other cause of failure. 

333 with contextlib.suppress(OSError): 

334 lock.unlink() 

335 return True 

336 return False 

337 

338 

339def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None: 

340 """Try to cleanup a folder if we can ensure it's deletable.""" 

341 if ensure_deletable(path, consider_lock_dead_if_created_before): 

342 maybe_delete_a_numbered_dir(path) 

343 

344 

345def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]: 

346 """List candidates for numbered directories to be removed - follows py.path.""" 

347 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) 

348 max_delete = max_existing - keep 

349 paths = find_prefixed(root, prefix) 

350 paths, paths2 = itertools.tee(paths) 

351 numbers = map(parse_num, extract_suffixes(paths2, prefix)) 

352 for path, number in zip(paths, numbers): 

353 if number <= max_delete: 

354 yield path 

355 

356 

357def cleanup_dead_symlinks(root: Path): 

358 for left_dir in root.iterdir(): 

359 if left_dir.is_symlink(): 

360 if not left_dir.resolve().exists(): 

361 left_dir.unlink() 

362 

363 

364def cleanup_numbered_dir( 

365 root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float 

366) -> None: 

367 """Cleanup for lock driven numbered directories.""" 

368 if not root.exists(): 

369 return 

370 for path in cleanup_candidates(root, prefix, keep): 

371 try_cleanup(path, consider_lock_dead_if_created_before) 

372 for path in root.glob("garbage-*"): 

373 try_cleanup(path, consider_lock_dead_if_created_before) 

374 

375 cleanup_dead_symlinks(root) 

376 

377 

378def make_numbered_dir_with_cleanup( 

379 root: Path, 

380 prefix: str, 

381 keep: int, 

382 lock_timeout: float, 

383 mode: int, 

384) -> Path: 

385 """Create a numbered dir with a cleanup lock and remove old ones.""" 

386 e = None 

387 for i in range(10): 

388 try: 

389 p = make_numbered_dir(root, prefix, mode) 

390 # Only lock the current dir when keep is not 0 

391 if keep != 0: 

392 lock_path = create_cleanup_lock(p) 

393 register_cleanup_lock_removal(lock_path) 

394 except Exception as exc: 

395 e = exc 

396 else: 

397 consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout 

398 # Register a cleanup for program exit 

399 atexit.register( 

400 cleanup_numbered_dir, 

401 root, 

402 prefix, 

403 keep, 

404 consider_lock_dead_if_created_before, 

405 ) 

406 return p 

407 assert e is not None 

408 raise e 

409 

410 

411def resolve_from_str(input: str, rootpath: Path) -> Path: 

412 input = expanduser(input) 

413 input = expandvars(input) 

414 if isabs(input): 

415 return Path(input) 

416 else: 

417 return rootpath.joinpath(input) 

418 

419 

420def fnmatch_ex(pattern: str, path: Union[str, "os.PathLike[str]"]) -> bool: 

421 """A port of FNMatcher from py.path.common which works with PurePath() instances. 

422 

423 The difference between this algorithm and PurePath.match() is that the 

424 latter matches "**" glob expressions for each part of the path, while 

425 this algorithm uses the whole path instead. 

426 

427 For example: 

428 "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" 

429 with this algorithm, but not with PurePath.match(). 

430 

431 This algorithm was ported to keep backward-compatibility with existing 

432 settings which assume paths match according this logic. 

433 

434 References: 

435 * https://bugs.python.org/issue29249 

436 * https://bugs.python.org/issue34731 

437 """ 

438 path = PurePath(path) 

439 iswin32 = sys.platform.startswith("win") 

440 

441 if iswin32 and sep not in pattern and posix_sep in pattern: 

442 # Running on Windows, the pattern has no Windows path separators, 

443 # and the pattern has one or more Posix path separators. Replace 

444 # the Posix path separators with the Windows path separator. 

445 pattern = pattern.replace(posix_sep, sep) 

446 

447 if sep not in pattern: 

448 name = path.name 

449 else: 

450 name = str(path) 

451 if path.is_absolute() and not os.path.isabs(pattern): 

452 pattern = f"*{os.sep}{pattern}" 

453 return fnmatch.fnmatch(name, pattern) 

454 

455 

456def parts(s: str) -> Set[str]: 

457 parts = s.split(sep) 

458 return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))} 

459 

460 

461def symlink_or_skip(src, dst, **kwargs): 

462 """Make a symlink, or skip the test in case symlinks are not supported.""" 

463 try: 

464 os.symlink(str(src), str(dst), **kwargs) 

465 except OSError as e: 

466 skip(f"symlinks not supported: {e}") 

467 

468 

469class ImportMode(Enum): 

470 """Possible values for `mode` parameter of `import_path`.""" 

471 

472 prepend = "prepend" 

473 append = "append" 

474 importlib = "importlib" 

475 

476 

477class ImportPathMismatchError(ImportError): 

478 """Raised on import_path() if there is a mismatch of __file__'s. 

479 

480 This can happen when `import_path` is called multiple times with different filenames that has 

481 the same basename but reside in packages 

482 (for example "/tests1/test_foo.py" and "/tests2/test_foo.py"). 

483 """ 

484 

485 

486def import_path( 

487 p: Union[str, "os.PathLike[str]"], 

488 *, 

489 mode: Union[str, ImportMode] = ImportMode.prepend, 

490 root: Path, 

491) -> ModuleType: 

492 """Import and return a module from the given path, which can be a file (a module) or 

493 a directory (a package). 

494 

495 The import mechanism used is controlled by the `mode` parameter: 

496 

497 * `mode == ImportMode.prepend`: the directory containing the module (or package, taking 

498 `__init__.py` files into account) will be put at the *start* of `sys.path` before 

499 being imported with `importlib.import_module`. 

500 

501 * `mode == ImportMode.append`: same as `prepend`, but the directory will be appended 

502 to the end of `sys.path`, if not already in `sys.path`. 

503 

504 * `mode == ImportMode.importlib`: uses more fine control mechanisms provided by `importlib` 

505 to import the module, which avoids having to muck with `sys.path` at all. It effectively 

506 allows having same-named test modules in different places. 

507 

508 :param root: 

509 Used as an anchor when mode == ImportMode.importlib to obtain 

510 a unique name for the module being imported so it can safely be stored 

511 into ``sys.modules``. 

512 

513 :raises ImportPathMismatchError: 

514 If after importing the given `path` and the module `__file__` 

515 are different. Only raised in `prepend` and `append` modes. 

516 """ 

517 mode = ImportMode(mode) 

518 

519 path = Path(p) 

520 

521 if not path.exists(): 

522 raise ImportError(path) 

523 

524 if mode is ImportMode.importlib: 

525 module_name = module_name_from_path(path, root) 

526 with contextlib.suppress(KeyError): 

527 return sys.modules[module_name] 

528 

529 for meta_importer in sys.meta_path: 

530 spec = meta_importer.find_spec(module_name, [str(path.parent)]) 

531 if spec is not None: 

532 break 

533 else: 

534 spec = importlib.util.spec_from_file_location(module_name, str(path)) 

535 

536 if spec is None: 

537 raise ImportError(f"Can't find module {module_name} at location {path}") 

538 mod = importlib.util.module_from_spec(spec) 

539 sys.modules[module_name] = mod 

540 spec.loader.exec_module(mod) # type: ignore[union-attr] 

541 insert_missing_modules(sys.modules, module_name) 

542 return mod 

543 

544 pkg_path = resolve_package_path(path) 

545 if pkg_path is not None: 

546 pkg_root = pkg_path.parent 

547 names = list(path.with_suffix("").relative_to(pkg_root).parts) 

548 if names[-1] == "__init__": 

549 names.pop() 

550 module_name = ".".join(names) 

551 else: 

552 pkg_root = path.parent 

553 module_name = path.stem 

554 

555 # Change sys.path permanently: restoring it at the end of this function would cause surprising 

556 # problems because of delayed imports: for example, a conftest.py file imported by this function 

557 # might have local imports, which would fail at runtime if we restored sys.path. 

558 if mode is ImportMode.append: 

559 if str(pkg_root) not in sys.path: 

560 sys.path.append(str(pkg_root)) 

561 elif mode is ImportMode.prepend: 

562 if str(pkg_root) != sys.path[0]: 

563 sys.path.insert(0, str(pkg_root)) 

564 else: 

565 assert_never(mode) 

566 

567 importlib.import_module(module_name) 

568 

569 mod = sys.modules[module_name] 

570 if path.name == "__init__.py": 

571 return mod 

572 

573 ignore = os.environ.get("PY_IGNORE_IMPORTMISMATCH", "") 

574 if ignore != "1": 

575 module_file = mod.__file__ 

576 if module_file is None: 

577 raise ImportPathMismatchError(module_name, module_file, path) 

578 

579 if module_file.endswith((".pyc", ".pyo")): 

580 module_file = module_file[:-1] 

581 if module_file.endswith(os.sep + "__init__.py"): 

582 module_file = module_file[: -(len(os.sep + "__init__.py"))] 

583 

584 try: 

585 is_same = _is_same(str(path), module_file) 

586 except FileNotFoundError: 

587 is_same = False 

588 

589 if not is_same: 

590 raise ImportPathMismatchError(module_name, module_file, path) 

591 

592 return mod 

593 

594 

595# Implement a special _is_same function on Windows which returns True if the two filenames 

596# compare equal, to circumvent os.path.samefile returning False for mounts in UNC (#7678). 

597if sys.platform.startswith("win"): 

598 

599 def _is_same(f1: str, f2: str) -> bool: 

600 return Path(f1) == Path(f2) or os.path.samefile(f1, f2) 

601 

602else: 

603 

604 def _is_same(f1: str, f2: str) -> bool: 

605 return os.path.samefile(f1, f2) 

606 

607 

608def module_name_from_path(path: Path, root: Path) -> str: 

609 """ 

610 Return a dotted module name based on the given path, anchored on root. 

611 

612 For example: path="projects/src/tests/test_foo.py" and root="/projects", the 

613 resulting module name will be "src.tests.test_foo". 

614 """ 

615 path = path.with_suffix("") 

616 try: 

617 relative_path = path.relative_to(root) 

618 except ValueError: 

619 # If we can't get a relative path to root, use the full path, except 

620 # for the first part ("d:\\" or "/" depending on the platform, for example). 

621 path_parts = path.parts[1:] 

622 else: 

623 # Use the parts for the relative path to the root path. 

624 path_parts = relative_path.parts 

625 

626 # Module name for packages do not contain the __init__ file. 

627 if path_parts[-1] == "__init__": 

628 path_parts = path_parts[:-1] 

629 

630 return ".".join(path_parts) 

631 

632 

633def insert_missing_modules(modules: Dict[str, ModuleType], module_name: str) -> None: 

634 """ 

635 Used by ``import_path`` to create intermediate modules when using mode=importlib. 

636 

637 When we want to import a module as "src.tests.test_foo" for example, we need 

638 to create empty modules "src" and "src.tests" after inserting "src.tests.test_foo", 

639 otherwise "src.tests.test_foo" is not importable by ``__import__``. 

640 """ 

641 module_parts = module_name.split(".") 

642 child_module: Union[ModuleType, None] = None 

643 module: Union[ModuleType, None] = None 

644 child_name: str = "" 

645 while module_name: 

646 if module_name not in modules: 

647 try: 

648 # If sys.meta_path is empty, calling import_module will issue 

649 # a warning and raise ModuleNotFoundError. To avoid the 

650 # warning, we check sys.meta_path explicitly and raise the error 

651 # ourselves to fall back to creating a dummy module. 

652 if not sys.meta_path: 

653 raise ModuleNotFoundError 

654 module = importlib.import_module(module_name) 

655 except ModuleNotFoundError: 

656 module = ModuleType( 

657 module_name, 

658 doc="Empty module created by pytest's importmode=importlib.", 

659 ) 

660 else: 

661 module = modules[module_name] 

662 if child_module: 

663 # Add child attribute to the parent that can reference the child 

664 # modules. 

665 if not hasattr(module, child_name): 

666 setattr(module, child_name, child_module) 

667 modules[module_name] = module 

668 # Keep track of the child module while moving up the tree. 

669 child_module, child_name = module, module_name.rpartition(".")[-1] 

670 module_parts.pop(-1) 

671 module_name = ".".join(module_parts) 

672 

673 

674def resolve_package_path(path: Path) -> Optional[Path]: 

675 """Return the Python package path by looking for the last 

676 directory upwards which still contains an __init__.py. 

677 

678 Returns None if it can not be determined. 

679 """ 

680 result = None 

681 for parent in itertools.chain((path,), path.parents): 

682 if parent.is_dir(): 

683 if not parent.joinpath("__init__.py").is_file(): 

684 break 

685 if not parent.name.isidentifier(): 

686 break 

687 result = parent 

688 return result 

689 

690 

691def scandir(path: Union[str, "os.PathLike[str]"]) -> List["os.DirEntry[str]"]: 

692 """Scan a directory recursively, in breadth-first order. 

693 

694 The returned entries are sorted. 

695 """ 

696 entries = [] 

697 with os.scandir(path) as s: 

698 # Skip entries with symlink loops and other brokenness, so the caller 

699 # doesn't have to deal with it. 

700 for entry in s: 

701 try: 

702 entry.is_file() 

703 except OSError as err: 

704 if _ignore_error(err): 

705 continue 

706 raise 

707 entries.append(entry) 

708 entries.sort(key=lambda entry: entry.name) 

709 return entries 

710 

711 

712def visit( 

713 path: Union[str, "os.PathLike[str]"], recurse: Callable[["os.DirEntry[str]"], bool] 

714) -> Iterator["os.DirEntry[str]"]: 

715 """Walk a directory recursively, in breadth-first order. 

716 

717 The `recurse` predicate determines whether a directory is recursed. 

718 

719 Entries at each directory level are sorted. 

720 """ 

721 entries = scandir(path) 

722 yield from entries 

723 for entry in entries: 

724 if entry.is_dir() and recurse(entry): 

725 yield from visit(entry.path, recurse) 

726 

727 

728def absolutepath(path: Union[Path, str]) -> Path: 

729 """Convert a path to an absolute path using os.path.abspath. 

730 

731 Prefer this over Path.resolve() (see #6523). 

732 Prefer this over Path.absolute() (not public, doesn't normalize). 

733 """ 

734 return Path(os.path.abspath(str(path))) 

735 

736 

737def commonpath(path1: Path, path2: Path) -> Optional[Path]: 

738 """Return the common part shared with the other path, or None if there is 

739 no common part. 

740 

741 If one path is relative and one is absolute, returns None. 

742 """ 

743 try: 

744 return Path(os.path.commonpath((str(path1), str(path2)))) 

745 except ValueError: 

746 return None 

747 

748 

749def bestrelpath(directory: Path, dest: Path) -> str: 

750 """Return a string which is a relative path from directory to dest such 

751 that directory/bestrelpath == dest. 

752 

753 The paths must be either both absolute or both relative. 

754 

755 If no such path can be determined, returns dest. 

756 """ 

757 assert isinstance(directory, Path) 

758 assert isinstance(dest, Path) 

759 if dest == directory: 

760 return os.curdir 

761 # Find the longest common directory. 

762 base = commonpath(directory, dest) 

763 # Can be the case on Windows for two absolute paths on different drives. 

764 # Can be the case for two relative paths without common prefix. 

765 # Can be the case for a relative path and an absolute path. 

766 if not base: 

767 return str(dest) 

768 reldirectory = directory.relative_to(base) 

769 reldest = dest.relative_to(base) 

770 return os.path.join( 

771 # Back from directory to base. 

772 *([os.pardir] * len(reldirectory.parts)), 

773 # Forward from base to dest. 

774 *reldest.parts, 

775 ) 

776 

777 

778# Originates from py. path.local.copy(), with siginficant trims and adjustments. 

779# TODO(py38): Replace with shutil.copytree(..., symlinks=True, dirs_exist_ok=True) 

780def copytree(source: Path, target: Path) -> None: 

781 """Recursively copy a source directory to target.""" 

782 assert source.is_dir() 

783 for entry in visit(source, recurse=lambda entry: not entry.is_symlink()): 

784 x = Path(entry) 

785 relpath = x.relative_to(source) 

786 newx = target / relpath 

787 newx.parent.mkdir(exist_ok=True) 

788 if x.is_symlink(): 

789 newx.symlink_to(os.readlink(x)) 

790 elif x.is_file(): 

791 shutil.copyfile(x, newx) 

792 elif x.is_dir(): 

793 newx.mkdir(exist_ok=True) 

794 

795 

796def safe_exists(p: Path) -> bool: 

797 """Like Path.exists(), but account for input arguments that might be too long (#11394).""" 

798 try: 

799 return p.exists() 

800 except (ValueError, OSError): 

801 # ValueError: stat: path too long for Windows 

802 # OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect 

803 return False