Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/shutil.py: 2%

796 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:05 +0000

1"""Utility functions for copying and archiving files and directory trees. 

2 

3XXX The functions here don't copy the resource fork or other metadata on Mac. 

4 

5""" 

6 

7import os 

8import sys 

9import stat 

10import fnmatch 

11import collections 

12import errno 

13 

14try: 

15 import zlib 

16 del zlib 

17 _ZLIB_SUPPORTED = True 

18except ImportError: 

19 _ZLIB_SUPPORTED = False 

20 

21try: 

22 import bz2 

23 del bz2 

24 _BZ2_SUPPORTED = True 

25except ImportError: 

26 _BZ2_SUPPORTED = False 

27 

28try: 

29 import lzma 

30 del lzma 

31 _LZMA_SUPPORTED = True 

32except ImportError: 

33 _LZMA_SUPPORTED = False 

34 

35try: 

36 from pwd import getpwnam 

37except ImportError: 

38 getpwnam = None 

39 

40try: 

41 from grp import getgrnam 

42except ImportError: 

43 getgrnam = None 

44 

45_WINDOWS = os.name == 'nt' 

46posix = nt = None 

47if os.name == 'posix': 

48 import posix 

49elif _WINDOWS: 

50 import nt 

51 

52COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024 

53_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux") 

54_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS 

55 

56# CMD defaults in Windows 10 

57_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC" 

58 

59__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2", 

60 "copytree", "move", "rmtree", "Error", "SpecialFileError", 

61 "ExecError", "make_archive", "get_archive_formats", 

62 "register_archive_format", "unregister_archive_format", 

63 "get_unpack_formats", "register_unpack_format", 

64 "unregister_unpack_format", "unpack_archive", 

65 "ignore_patterns", "chown", "which", "get_terminal_size", 

66 "SameFileError"] 

67 # disk_usage is added later, if available on the platform 

68 

69class Error(OSError): 

70 pass 

71 

72class SameFileError(Error): 

73 """Raised when source and destination are the same file.""" 

74 

75class SpecialFileError(OSError): 

76 """Raised when trying to do a kind of operation (e.g. copying) which is 

77 not supported on a special file (e.g. a named pipe)""" 

78 

79class ExecError(OSError): 

80 """Raised when a command could not be executed""" 

81 

82class ReadError(OSError): 

83 """Raised when an archive cannot be read""" 

84 

85class RegistryError(Exception): 

86 """Raised when a registry operation with the archiving 

87 and unpacking registries fails""" 

88 

89class _GiveupOnFastCopy(Exception): 

90 """Raised as a signal to fallback on using raw read()/write() 

91 file copy when fast-copy functions fail to do so. 

92 """ 

93 

94def _fastcopy_fcopyfile(fsrc, fdst, flags): 

95 """Copy a regular file content or metadata by using high-performance 

96 fcopyfile(3) syscall (macOS). 

97 """ 

98 try: 

99 infd = fsrc.fileno() 

100 outfd = fdst.fileno() 

101 except Exception as err: 

102 raise _GiveupOnFastCopy(err) # not a regular file 

103 

104 try: 

105 posix._fcopyfile(infd, outfd, flags) 

106 except OSError as err: 

107 err.filename = fsrc.name 

108 err.filename2 = fdst.name 

109 if err.errno in {errno.EINVAL, errno.ENOTSUP}: 

110 raise _GiveupOnFastCopy(err) 

111 else: 

112 raise err from None 

113 

114def _fastcopy_sendfile(fsrc, fdst): 

115 """Copy data from one regular mmap-like fd to another by using 

116 high-performance sendfile(2) syscall. 

117 This should work on Linux >= 2.6.33 only. 

118 """ 

119 # Note: copyfileobj() is left alone in order to not introduce any 

120 # unexpected breakage. Possible risks by using zero-copy calls 

121 # in copyfileobj() are: 

122 # - fdst cannot be open in "a"(ppend) mode 

123 # - fsrc and fdst may be open in "t"(ext) mode 

124 # - fsrc may be a BufferedReader (which hides unread data in a buffer), 

125 # GzipFile (which decompresses data), HTTPResponse (which decodes 

126 # chunks). 

127 # - possibly others (e.g. encrypted fs/partition?) 

128 global _USE_CP_SENDFILE 

129 try: 

130 infd = fsrc.fileno() 

131 outfd = fdst.fileno() 

132 except Exception as err: 

133 raise _GiveupOnFastCopy(err) # not a regular file 

134 

135 # Hopefully the whole file will be copied in a single call. 

136 # sendfile() is called in a loop 'till EOF is reached (0 return) 

137 # so a bufsize smaller or bigger than the actual file size 

138 # should not make any difference, also in case the file content 

139 # changes while being copied. 

140 try: 

141 blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB 

142 except OSError: 

143 blocksize = 2 ** 27 # 128MiB 

144 # On 32-bit architectures truncate to 1GiB to avoid OverflowError, 

145 # see bpo-38319. 

146 if sys.maxsize < 2 ** 32: 

147 blocksize = min(blocksize, 2 ** 30) 

148 

149 offset = 0 

150 while True: 

151 try: 

152 sent = os.sendfile(outfd, infd, offset, blocksize) 

153 except OSError as err: 

154 # ...in oder to have a more informative exception. 

155 err.filename = fsrc.name 

156 err.filename2 = fdst.name 

157 

158 if err.errno == errno.ENOTSOCK: 

159 # sendfile() on this platform (probably Linux < 2.6.33) 

160 # does not support copies between regular files (only 

161 # sockets). 

162 _USE_CP_SENDFILE = False 

163 raise _GiveupOnFastCopy(err) 

164 

165 if err.errno == errno.ENOSPC: # filesystem is full 

166 raise err from None 

167 

168 # Give up on first call and if no data was copied. 

169 if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0: 

170 raise _GiveupOnFastCopy(err) 

171 

172 raise err 

173 else: 

174 if sent == 0: 

175 break # EOF 

176 offset += sent 

177 

178def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE): 

179 """readinto()/memoryview() based variant of copyfileobj(). 

180 *fsrc* must support readinto() method and both files must be 

181 open in binary mode. 

182 """ 

183 # Localize variable access to minimize overhead. 

184 fsrc_readinto = fsrc.readinto 

185 fdst_write = fdst.write 

186 with memoryview(bytearray(length)) as mv: 

187 while True: 

188 n = fsrc_readinto(mv) 

189 if not n: 

190 break 

191 elif n < length: 

192 with mv[:n] as smv: 

193 fdst.write(smv) 

194 else: 

195 fdst_write(mv) 

196 

197def copyfileobj(fsrc, fdst, length=0): 

198 """copy data from file-like object fsrc to file-like object fdst""" 

199 # Localize variable access to minimize overhead. 

200 if not length: 

201 length = COPY_BUFSIZE 

202 fsrc_read = fsrc.read 

203 fdst_write = fdst.write 

204 while True: 

205 buf = fsrc_read(length) 

206 if not buf: 

207 break 

208 fdst_write(buf) 

209 

210def _samefile(src, dst): 

211 # Macintosh, Unix. 

212 if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'): 

213 try: 

214 return os.path.samestat(src.stat(), os.stat(dst)) 

215 except OSError: 

216 return False 

217 

218 if hasattr(os.path, 'samefile'): 

219 try: 

220 return os.path.samefile(src, dst) 

221 except OSError: 

222 return False 

223 

224 # All other platforms: check for same pathname. 

225 return (os.path.normcase(os.path.abspath(src)) == 

226 os.path.normcase(os.path.abspath(dst))) 

227 

228def _stat(fn): 

229 return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn) 

230 

231def _islink(fn): 

232 return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn) 

233 

234def copyfile(src, dst, *, follow_symlinks=True): 

235 """Copy data from src to dst in the most efficient way possible. 

236 

237 If follow_symlinks is not set and src is a symbolic link, a new 

238 symlink will be created instead of copying the file it points to. 

239 

240 """ 

241 sys.audit("shutil.copyfile", src, dst) 

242 

243 if _samefile(src, dst): 

244 raise SameFileError("{!r} and {!r} are the same file".format(src, dst)) 

245 

246 file_size = 0 

247 for i, fn in enumerate([src, dst]): 

248 try: 

249 st = _stat(fn) 

250 except OSError: 

251 # File most likely does not exist 

252 pass 

253 else: 

254 # XXX What about other special files? (sockets, devices...) 

255 if stat.S_ISFIFO(st.st_mode): 

256 fn = fn.path if isinstance(fn, os.DirEntry) else fn 

257 raise SpecialFileError("`%s` is a named pipe" % fn) 

258 if _WINDOWS and i == 0: 

259 file_size = st.st_size 

260 

261 if not follow_symlinks and _islink(src): 

262 os.symlink(os.readlink(src), dst) 

263 else: 

264 with open(src, 'rb') as fsrc, open(dst, 'wb') as fdst: 

265 # macOS 

266 if _HAS_FCOPYFILE: 

267 try: 

268 _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) 

269 return dst 

270 except _GiveupOnFastCopy: 

271 pass 

272 # Linux 

273 elif _USE_CP_SENDFILE: 

274 try: 

275 _fastcopy_sendfile(fsrc, fdst) 

276 return dst 

277 except _GiveupOnFastCopy: 

278 pass 

279 # Windows, see: 

280 # https://github.com/python/cpython/pull/7160#discussion_r195405230 

281 elif _WINDOWS and file_size > 0: 

282 _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE)) 

283 return dst 

284 

285 copyfileobj(fsrc, fdst) 

286 

287 return dst 

288 

289def copymode(src, dst, *, follow_symlinks=True): 

290 """Copy mode bits from src to dst. 

291 

292 If follow_symlinks is not set, symlinks aren't followed if and only 

293 if both `src` and `dst` are symlinks. If `lchmod` isn't available 

294 (e.g. Linux) this method does nothing. 

295 

296 """ 

297 sys.audit("shutil.copymode", src, dst) 

298 

299 if not follow_symlinks and _islink(src) and os.path.islink(dst): 

300 if hasattr(os, 'lchmod'): 

301 stat_func, chmod_func = os.lstat, os.lchmod 

302 else: 

303 return 

304 else: 

305 stat_func, chmod_func = _stat, os.chmod 

306 

307 st = stat_func(src) 

308 chmod_func(dst, stat.S_IMODE(st.st_mode)) 

309 

310if hasattr(os, 'listxattr'): 

311 def _copyxattr(src, dst, *, follow_symlinks=True): 

312 """Copy extended filesystem attributes from `src` to `dst`. 

313 

314 Overwrite existing attributes. 

315 

316 If `follow_symlinks` is false, symlinks won't be followed. 

317 

318 """ 

319 

320 try: 

321 names = os.listxattr(src, follow_symlinks=follow_symlinks) 

322 except OSError as e: 

323 if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL): 

324 raise 

325 return 

326 for name in names: 

327 try: 

328 value = os.getxattr(src, name, follow_symlinks=follow_symlinks) 

329 os.setxattr(dst, name, value, follow_symlinks=follow_symlinks) 

330 except OSError as e: 

331 if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA, 

332 errno.EINVAL): 

333 raise 

334else: 

335 def _copyxattr(*args, **kwargs): 

336 pass 

337 

338def copystat(src, dst, *, follow_symlinks=True): 

339 """Copy file metadata 

340 

341 Copy the permission bits, last access time, last modification time, and 

342 flags from `src` to `dst`. On Linux, copystat() also copies the "extended 

343 attributes" where possible. The file contents, owner, and group are 

344 unaffected. `src` and `dst` are path-like objects or path names given as 

345 strings. 

346 

347 If the optional flag `follow_symlinks` is not set, symlinks aren't 

348 followed if and only if both `src` and `dst` are symlinks. 

349 """ 

350 sys.audit("shutil.copystat", src, dst) 

351 

352 def _nop(*args, ns=None, follow_symlinks=None): 

353 pass 

354 

355 # follow symlinks (aka don't not follow symlinks) 

356 follow = follow_symlinks or not (_islink(src) and os.path.islink(dst)) 

357 if follow: 

358 # use the real function if it exists 

359 def lookup(name): 

360 return getattr(os, name, _nop) 

361 else: 

362 # use the real function only if it exists 

363 # *and* it supports follow_symlinks 

364 def lookup(name): 

365 fn = getattr(os, name, _nop) 

366 if fn in os.supports_follow_symlinks: 

367 return fn 

368 return _nop 

369 

370 if isinstance(src, os.DirEntry): 

371 st = src.stat(follow_symlinks=follow) 

372 else: 

373 st = lookup("stat")(src, follow_symlinks=follow) 

374 mode = stat.S_IMODE(st.st_mode) 

375 lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns), 

376 follow_symlinks=follow) 

377 # We must copy extended attributes before the file is (potentially) 

378 # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. 

379 _copyxattr(src, dst, follow_symlinks=follow) 

380 try: 

381 lookup("chmod")(dst, mode, follow_symlinks=follow) 

382 except NotImplementedError: 

383 # if we got a NotImplementedError, it's because 

384 # * follow_symlinks=False, 

385 # * lchown() is unavailable, and 

386 # * either 

387 # * fchownat() is unavailable or 

388 # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. 

389 # (it returned ENOSUP.) 

390 # therefore we're out of options--we simply cannot chown the 

391 # symlink. give up, suppress the error. 

392 # (which is what shutil always did in this circumstance.) 

393 pass 

394 if hasattr(st, 'st_flags'): 

395 try: 

396 lookup("chflags")(dst, st.st_flags, follow_symlinks=follow) 

397 except OSError as why: 

398 for err in 'EOPNOTSUPP', 'ENOTSUP': 

399 if hasattr(errno, err) and why.errno == getattr(errno, err): 

400 break 

401 else: 

402 raise 

403 

404def copy(src, dst, *, follow_symlinks=True): 

405 """Copy data and mode bits ("cp src dst"). Return the file's destination. 

406 

407 The destination may be a directory. 

408 

409 If follow_symlinks is false, symlinks won't be followed. This 

410 resembles GNU's "cp -P src dst". 

411 

412 If source and destination are the same file, a SameFileError will be 

413 raised. 

414 

415 """ 

416 if os.path.isdir(dst): 

417 dst = os.path.join(dst, os.path.basename(src)) 

418 copyfile(src, dst, follow_symlinks=follow_symlinks) 

419 copymode(src, dst, follow_symlinks=follow_symlinks) 

420 return dst 

421 

422def copy2(src, dst, *, follow_symlinks=True): 

423 """Copy data and metadata. Return the file's destination. 

424 

425 Metadata is copied with copystat(). Please see the copystat function 

426 for more information. 

427 

428 The destination may be a directory. 

429 

430 If follow_symlinks is false, symlinks won't be followed. This 

431 resembles GNU's "cp -P src dst". 

432 """ 

433 if os.path.isdir(dst): 

434 dst = os.path.join(dst, os.path.basename(src)) 

435 copyfile(src, dst, follow_symlinks=follow_symlinks) 

436 copystat(src, dst, follow_symlinks=follow_symlinks) 

437 return dst 

438 

439def ignore_patterns(*patterns): 

440 """Function that can be used as copytree() ignore parameter. 

441 

442 Patterns is a sequence of glob-style patterns 

443 that are used to exclude files""" 

444 def _ignore_patterns(path, names): 

445 ignored_names = [] 

446 for pattern in patterns: 

447 ignored_names.extend(fnmatch.filter(names, pattern)) 

448 return set(ignored_names) 

449 return _ignore_patterns 

450 

451def _copytree(entries, src, dst, symlinks, ignore, copy_function, 

452 ignore_dangling_symlinks, dirs_exist_ok=False): 

453 if ignore is not None: 

454 ignored_names = ignore(os.fspath(src), [x.name for x in entries]) 

455 else: 

456 ignored_names = set() 

457 

458 os.makedirs(dst, exist_ok=dirs_exist_ok) 

459 errors = [] 

460 use_srcentry = copy_function is copy2 or copy_function is copy 

461 

462 for srcentry in entries: 

463 if srcentry.name in ignored_names: 

464 continue 

465 srcname = os.path.join(src, srcentry.name) 

466 dstname = os.path.join(dst, srcentry.name) 

467 srcobj = srcentry if use_srcentry else srcname 

468 try: 

469 is_symlink = srcentry.is_symlink() 

470 if is_symlink and os.name == 'nt': 

471 # Special check for directory junctions, which appear as 

472 # symlinks but we want to recurse. 

473 lstat = srcentry.stat(follow_symlinks=False) 

474 if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT: 

475 is_symlink = False 

476 if is_symlink: 

477 linkto = os.readlink(srcname) 

478 if symlinks: 

479 # We can't just leave it to `copy_function` because legacy 

480 # code with a custom `copy_function` may rely on copytree 

481 # doing the right thing. 

482 os.symlink(linkto, dstname) 

483 copystat(srcobj, dstname, follow_symlinks=not symlinks) 

484 else: 

485 # ignore dangling symlink if the flag is on 

486 if not os.path.exists(linkto) and ignore_dangling_symlinks: 

487 continue 

488 # otherwise let the copy occur. copy2 will raise an error 

489 if srcentry.is_dir(): 

490 copytree(srcobj, dstname, symlinks, ignore, 

491 copy_function, dirs_exist_ok=dirs_exist_ok) 

492 else: 

493 copy_function(srcobj, dstname) 

494 elif srcentry.is_dir(): 

495 copytree(srcobj, dstname, symlinks, ignore, copy_function, 

496 dirs_exist_ok=dirs_exist_ok) 

497 else: 

498 # Will raise a SpecialFileError for unsupported file types 

499 copy_function(srcobj, dstname) 

500 # catch the Error from the recursive copytree so that we can 

501 # continue with other files 

502 except Error as err: 

503 errors.extend(err.args[0]) 

504 except OSError as why: 

505 errors.append((srcname, dstname, str(why))) 

506 try: 

507 copystat(src, dst) 

508 except OSError as why: 

509 # Copying file access times may fail on Windows 

510 if getattr(why, 'winerror', None) is None: 

511 errors.append((src, dst, str(why))) 

512 if errors: 

513 raise Error(errors) 

514 return dst 

515 

516def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2, 

517 ignore_dangling_symlinks=False, dirs_exist_ok=False): 

518 """Recursively copy a directory tree and return the destination directory. 

519 

520 dirs_exist_ok dictates whether to raise an exception in case dst or any 

521 missing parent directory already exists. 

522 

523 If exception(s) occur, an Error is raised with a list of reasons. 

524 

525 If the optional symlinks flag is true, symbolic links in the 

526 source tree result in symbolic links in the destination tree; if 

527 it is false, the contents of the files pointed to by symbolic 

528 links are copied. If the file pointed by the symlink doesn't 

529 exist, an exception will be added in the list of errors raised in 

530 an Error exception at the end of the copy process. 

531 

532 You can set the optional ignore_dangling_symlinks flag to true if you 

533 want to silence this exception. Notice that this has no effect on 

534 platforms that don't support os.symlink. 

535 

536 The optional ignore argument is a callable. If given, it 

537 is called with the `src` parameter, which is the directory 

538 being visited by copytree(), and `names` which is the list of 

539 `src` contents, as returned by os.listdir(): 

540 

541 callable(src, names) -> ignored_names 

542 

543 Since copytree() is called recursively, the callable will be 

544 called once for each directory that is copied. It returns a 

545 list of names relative to the `src` directory that should 

546 not be copied. 

547 

548 The optional copy_function argument is a callable that will be used 

549 to copy each file. It will be called with the source path and the 

550 destination path as arguments. By default, copy2() is used, but any 

551 function that supports the same signature (like copy()) can be used. 

552 

553 """ 

554 sys.audit("shutil.copytree", src, dst) 

555 with os.scandir(src) as itr: 

556 entries = list(itr) 

557 return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks, 

558 ignore=ignore, copy_function=copy_function, 

559 ignore_dangling_symlinks=ignore_dangling_symlinks, 

560 dirs_exist_ok=dirs_exist_ok) 

561 

562if hasattr(os.stat_result, 'st_file_attributes'): 

563 # Special handling for directory junctions to make them behave like 

564 # symlinks for shutil.rmtree, since in general they do not appear as 

565 # regular links. 

566 def _rmtree_isdir(entry): 

567 try: 

568 st = entry.stat(follow_symlinks=False) 

569 return (stat.S_ISDIR(st.st_mode) and not 

570 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT 

571 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) 

572 except OSError: 

573 return False 

574 

575 def _rmtree_islink(path): 

576 try: 

577 st = os.lstat(path) 

578 return (stat.S_ISLNK(st.st_mode) or 

579 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT 

580 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) 

581 except OSError: 

582 return False 

583else: 

584 def _rmtree_isdir(entry): 

585 try: 

586 return entry.is_dir(follow_symlinks=False) 

587 except OSError: 

588 return False 

589 

590 def _rmtree_islink(path): 

591 return os.path.islink(path) 

592 

593# version vulnerable to race conditions 

594def _rmtree_unsafe(path, onerror): 

595 try: 

596 with os.scandir(path) as scandir_it: 

597 entries = list(scandir_it) 

598 except OSError: 

599 onerror(os.scandir, path, sys.exc_info()) 

600 entries = [] 

601 for entry in entries: 

602 fullname = entry.path 

603 if _rmtree_isdir(entry): 

604 try: 

605 if entry.is_symlink(): 

606 # This can only happen if someone replaces 

607 # a directory with a symlink after the call to 

608 # os.scandir or entry.is_dir above. 

609 raise OSError("Cannot call rmtree on a symbolic link") 

610 except OSError: 

611 onerror(os.path.islink, fullname, sys.exc_info()) 

612 continue 

613 _rmtree_unsafe(fullname, onerror) 

614 else: 

615 try: 

616 os.unlink(fullname) 

617 except OSError: 

618 onerror(os.unlink, fullname, sys.exc_info()) 

619 try: 

620 os.rmdir(path) 

621 except OSError: 

622 onerror(os.rmdir, path, sys.exc_info()) 

623 

624# Version using fd-based APIs to protect against races 

625def _rmtree_safe_fd(topfd, path, onerror): 

626 try: 

627 with os.scandir(topfd) as scandir_it: 

628 entries = list(scandir_it) 

629 except OSError as err: 

630 err.filename = path 

631 onerror(os.scandir, path, sys.exc_info()) 

632 return 

633 for entry in entries: 

634 fullname = os.path.join(path, entry.name) 

635 try: 

636 is_dir = entry.is_dir(follow_symlinks=False) 

637 except OSError: 

638 is_dir = False 

639 else: 

640 if is_dir: 

641 try: 

642 orig_st = entry.stat(follow_symlinks=False) 

643 is_dir = stat.S_ISDIR(orig_st.st_mode) 

644 except OSError: 

645 onerror(os.lstat, fullname, sys.exc_info()) 

646 continue 

647 if is_dir: 

648 try: 

649 dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd) 

650 except OSError: 

651 onerror(os.open, fullname, sys.exc_info()) 

652 else: 

653 try: 

654 if os.path.samestat(orig_st, os.fstat(dirfd)): 

655 _rmtree_safe_fd(dirfd, fullname, onerror) 

656 try: 

657 os.rmdir(entry.name, dir_fd=topfd) 

658 except OSError: 

659 onerror(os.rmdir, fullname, sys.exc_info()) 

660 else: 

661 try: 

662 # This can only happen if someone replaces 

663 # a directory with a symlink after the call to 

664 # os.scandir or stat.S_ISDIR above. 

665 raise OSError("Cannot call rmtree on a symbolic " 

666 "link") 

667 except OSError: 

668 onerror(os.path.islink, fullname, sys.exc_info()) 

669 finally: 

670 os.close(dirfd) 

671 else: 

672 try: 

673 os.unlink(entry.name, dir_fd=topfd) 

674 except OSError: 

675 onerror(os.unlink, fullname, sys.exc_info()) 

676 

677_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= 

678 os.supports_dir_fd and 

679 os.scandir in os.supports_fd and 

680 os.stat in os.supports_follow_symlinks) 

681 

682def rmtree(path, ignore_errors=False, onerror=None): 

683 """Recursively delete a directory tree. 

684 

685 If ignore_errors is set, errors are ignored; otherwise, if onerror 

686 is set, it is called to handle the error with arguments (func, 

687 path, exc_info) where func is platform and implementation dependent; 

688 path is the argument to that function that caused it to fail; and 

689 exc_info is a tuple returned by sys.exc_info(). If ignore_errors 

690 is false and onerror is None, an exception is raised. 

691 

692 """ 

693 sys.audit("shutil.rmtree", path) 

694 if ignore_errors: 

695 def onerror(*args): 

696 pass 

697 elif onerror is None: 

698 def onerror(*args): 

699 raise 

700 if _use_fd_functions: 

701 # While the unsafe rmtree works fine on bytes, the fd based does not. 

702 if isinstance(path, bytes): 

703 path = os.fsdecode(path) 

704 # Note: To guard against symlink races, we use the standard 

705 # lstat()/open()/fstat() trick. 

706 try: 

707 orig_st = os.lstat(path) 

708 except Exception: 

709 onerror(os.lstat, path, sys.exc_info()) 

710 return 

711 try: 

712 fd = os.open(path, os.O_RDONLY) 

713 except Exception: 

714 onerror(os.open, path, sys.exc_info()) 

715 return 

716 try: 

717 if os.path.samestat(orig_st, os.fstat(fd)): 

718 _rmtree_safe_fd(fd, path, onerror) 

719 try: 

720 os.rmdir(path) 

721 except OSError: 

722 onerror(os.rmdir, path, sys.exc_info()) 

723 else: 

724 try: 

725 # symlinks to directories are forbidden, see bug #1669 

726 raise OSError("Cannot call rmtree on a symbolic link") 

727 except OSError: 

728 onerror(os.path.islink, path, sys.exc_info()) 

729 finally: 

730 os.close(fd) 

731 else: 

732 try: 

733 if _rmtree_islink(path): 

734 # symlinks to directories are forbidden, see bug #1669 

735 raise OSError("Cannot call rmtree on a symbolic link") 

736 except OSError: 

737 onerror(os.path.islink, path, sys.exc_info()) 

738 # can't continue even if onerror hook returns 

739 return 

740 return _rmtree_unsafe(path, onerror) 

741 

742# Allow introspection of whether or not the hardening against symlink 

743# attacks is supported on the current platform 

744rmtree.avoids_symlink_attacks = _use_fd_functions 

745 

746def _basename(path): 

747 """A basename() variant which first strips the trailing slash, if present. 

748 Thus we always get the last component of the path, even for directories. 

749 

750 path: Union[PathLike, str] 

751 

752 e.g. 

753 >>> os.path.basename('/bar/foo') 

754 'foo' 

755 >>> os.path.basename('/bar/foo/') 

756 '' 

757 >>> _basename('/bar/foo/') 

758 'foo' 

759 """ 

760 path = os.fspath(path) 

761 sep = os.path.sep + (os.path.altsep or '') 

762 return os.path.basename(path.rstrip(sep)) 

763 

764def move(src, dst, copy_function=copy2): 

765 """Recursively move a file or directory to another location. This is 

766 similar to the Unix "mv" command. Return the file or directory's 

767 destination. 

768 

769 If the destination is a directory or a symlink to a directory, the source 

770 is moved inside the directory. The destination path must not already 

771 exist. 

772 

773 If the destination already exists but is not a directory, it may be 

774 overwritten depending on os.rename() semantics. 

775 

776 If the destination is on our current filesystem, then rename() is used. 

777 Otherwise, src is copied to the destination and then removed. Symlinks are 

778 recreated under the new name if os.rename() fails because of cross 

779 filesystem renames. 

780 

781 The optional `copy_function` argument is a callable that will be used 

782 to copy the source or it will be delegated to `copytree`. 

783 By default, copy2() is used, but any function that supports the same 

784 signature (like copy()) can be used. 

785 

786 A lot more could be done here... A look at a mv.c shows a lot of 

787 the issues this implementation glosses over. 

788 

789 """ 

790 sys.audit("shutil.move", src, dst) 

791 real_dst = dst 

792 if os.path.isdir(dst): 

793 if _samefile(src, dst): 

794 # We might be on a case insensitive filesystem, 

795 # perform the rename anyway. 

796 os.rename(src, dst) 

797 return 

798 

799 # Using _basename instead of os.path.basename is important, as we must 

800 # ignore any trailing slash to avoid the basename returning '' 

801 real_dst = os.path.join(dst, _basename(src)) 

802 

803 if os.path.exists(real_dst): 

804 raise Error("Destination path '%s' already exists" % real_dst) 

805 try: 

806 os.rename(src, real_dst) 

807 except OSError: 

808 if os.path.islink(src): 

809 linkto = os.readlink(src) 

810 os.symlink(linkto, real_dst) 

811 os.unlink(src) 

812 elif os.path.isdir(src): 

813 if _destinsrc(src, dst): 

814 raise Error("Cannot move a directory '%s' into itself" 

815 " '%s'." % (src, dst)) 

816 if (_is_immutable(src) 

817 or (not os.access(src, os.W_OK) and os.listdir(src) 

818 and sys.platform == 'darwin')): 

819 raise PermissionError("Cannot move the non-empty directory " 

820 "'%s': Lacking write permission to '%s'." 

821 % (src, src)) 

822 copytree(src, real_dst, copy_function=copy_function, 

823 symlinks=True) 

824 rmtree(src) 

825 else: 

826 copy_function(src, real_dst) 

827 os.unlink(src) 

828 return real_dst 

829 

830def _destinsrc(src, dst): 

831 src = os.path.abspath(src) 

832 dst = os.path.abspath(dst) 

833 if not src.endswith(os.path.sep): 

834 src += os.path.sep 

835 if not dst.endswith(os.path.sep): 

836 dst += os.path.sep 

837 return dst.startswith(src) 

838 

839def _is_immutable(src): 

840 st = _stat(src) 

841 immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE] 

842 return hasattr(st, 'st_flags') and st.st_flags in immutable_states 

843 

844def _get_gid(name): 

845 """Returns a gid, given a group name.""" 

846 if getgrnam is None or name is None: 

847 return None 

848 try: 

849 result = getgrnam(name) 

850 except KeyError: 

851 result = None 

852 if result is not None: 

853 return result[2] 

854 return None 

855 

856def _get_uid(name): 

857 """Returns an uid, given a user name.""" 

858 if getpwnam is None or name is None: 

859 return None 

860 try: 

861 result = getpwnam(name) 

862 except KeyError: 

863 result = None 

864 if result is not None: 

865 return result[2] 

866 return None 

867 

868def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0, 

869 owner=None, group=None, logger=None): 

870 """Create a (possibly compressed) tar file from all the files under 

871 'base_dir'. 

872 

873 'compress' must be "gzip" (the default), "bzip2", "xz", or None. 

874 

875 'owner' and 'group' can be used to define an owner and a group for the 

876 archive that is being built. If not provided, the current owner and group 

877 will be used. 

878 

879 The output tar file will be named 'base_name' + ".tar", possibly plus 

880 the appropriate compression extension (".gz", ".bz2", or ".xz"). 

881 

882 Returns the output filename. 

883 """ 

884 if compress is None: 

885 tar_compression = '' 

886 elif _ZLIB_SUPPORTED and compress == 'gzip': 

887 tar_compression = 'gz' 

888 elif _BZ2_SUPPORTED and compress == 'bzip2': 

889 tar_compression = 'bz2' 

890 elif _LZMA_SUPPORTED and compress == 'xz': 

891 tar_compression = 'xz' 

892 else: 

893 raise ValueError("bad value for 'compress', or compression format not " 

894 "supported : {0}".format(compress)) 

895 

896 import tarfile # late import for breaking circular dependency 

897 

898 compress_ext = '.' + tar_compression if compress else '' 

899 archive_name = base_name + '.tar' + compress_ext 

900 archive_dir = os.path.dirname(archive_name) 

901 

902 if archive_dir and not os.path.exists(archive_dir): 

903 if logger is not None: 

904 logger.info("creating %s", archive_dir) 

905 if not dry_run: 

906 os.makedirs(archive_dir) 

907 

908 # creating the tarball 

909 if logger is not None: 

910 logger.info('Creating tar archive') 

911 

912 uid = _get_uid(owner) 

913 gid = _get_gid(group) 

914 

915 def _set_uid_gid(tarinfo): 

916 if gid is not None: 

917 tarinfo.gid = gid 

918 tarinfo.gname = group 

919 if uid is not None: 

920 tarinfo.uid = uid 

921 tarinfo.uname = owner 

922 return tarinfo 

923 

924 if not dry_run: 

925 tar = tarfile.open(archive_name, 'w|%s' % tar_compression) 

926 try: 

927 tar.add(base_dir, filter=_set_uid_gid) 

928 finally: 

929 tar.close() 

930 

931 return archive_name 

932 

933def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None): 

934 """Create a zip file from all the files under 'base_dir'. 

935 

936 The output zip file will be named 'base_name' + ".zip". Returns the 

937 name of the output zip file. 

938 """ 

939 import zipfile # late import for breaking circular dependency 

940 

941 zip_filename = base_name + ".zip" 

942 archive_dir = os.path.dirname(base_name) 

943 

944 if archive_dir and not os.path.exists(archive_dir): 

945 if logger is not None: 

946 logger.info("creating %s", archive_dir) 

947 if not dry_run: 

948 os.makedirs(archive_dir) 

949 

950 if logger is not None: 

951 logger.info("creating '%s' and adding '%s' to it", 

952 zip_filename, base_dir) 

953 

954 if not dry_run: 

955 with zipfile.ZipFile(zip_filename, "w", 

956 compression=zipfile.ZIP_DEFLATED) as zf: 

957 path = os.path.normpath(base_dir) 

958 if path != os.curdir: 

959 zf.write(path, path) 

960 if logger is not None: 

961 logger.info("adding '%s'", path) 

962 for dirpath, dirnames, filenames in os.walk(base_dir): 

963 for name in sorted(dirnames): 

964 path = os.path.normpath(os.path.join(dirpath, name)) 

965 zf.write(path, path) 

966 if logger is not None: 

967 logger.info("adding '%s'", path) 

968 for name in filenames: 

969 path = os.path.normpath(os.path.join(dirpath, name)) 

970 if os.path.isfile(path): 

971 zf.write(path, path) 

972 if logger is not None: 

973 logger.info("adding '%s'", path) 

974 

975 return zip_filename 

976 

977_ARCHIVE_FORMATS = { 

978 'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"), 

979} 

980 

981if _ZLIB_SUPPORTED: 

982 _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')], 

983 "gzip'ed tar-file") 

984 _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file") 

985 

986if _BZ2_SUPPORTED: 

987 _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')], 

988 "bzip2'ed tar-file") 

989 

990if _LZMA_SUPPORTED: 

991 _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')], 

992 "xz'ed tar-file") 

993 

994def get_archive_formats(): 

995 """Returns a list of supported formats for archiving and unarchiving. 

996 

997 Each element of the returned sequence is a tuple (name, description) 

998 """ 

999 formats = [(name, registry[2]) for name, registry in 

1000 _ARCHIVE_FORMATS.items()] 

1001 formats.sort() 

1002 return formats 

1003 

1004def register_archive_format(name, function, extra_args=None, description=''): 

1005 """Registers an archive format. 

1006 

1007 name is the name of the format. function is the callable that will be 

1008 used to create archives. If provided, extra_args is a sequence of 

1009 (name, value) tuples that will be passed as arguments to the callable. 

1010 description can be provided to describe the format, and will be returned 

1011 by the get_archive_formats() function. 

1012 """ 

1013 if extra_args is None: 

1014 extra_args = [] 

1015 if not callable(function): 

1016 raise TypeError('The %s object is not callable' % function) 

1017 if not isinstance(extra_args, (tuple, list)): 

1018 raise TypeError('extra_args needs to be a sequence') 

1019 for element in extra_args: 

1020 if not isinstance(element, (tuple, list)) or len(element) !=2: 

1021 raise TypeError('extra_args elements are : (arg_name, value)') 

1022 

1023 _ARCHIVE_FORMATS[name] = (function, extra_args, description) 

1024 

1025def unregister_archive_format(name): 

1026 del _ARCHIVE_FORMATS[name] 

1027 

1028def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, 

1029 dry_run=0, owner=None, group=None, logger=None): 

1030 """Create an archive file (eg. zip or tar). 

1031 

1032 'base_name' is the name of the file to create, minus any format-specific 

1033 extension; 'format' is the archive format: one of "zip", "tar", "gztar", 

1034 "bztar", or "xztar". Or any other registered format. 

1035 

1036 'root_dir' is a directory that will be the root directory of the 

1037 archive; ie. we typically chdir into 'root_dir' before creating the 

1038 archive. 'base_dir' is the directory where we start archiving from; 

1039 ie. 'base_dir' will be the common prefix of all files and 

1040 directories in the archive. 'root_dir' and 'base_dir' both default 

1041 to the current directory. Returns the name of the archive file. 

1042 

1043 'owner' and 'group' are used when creating a tar archive. By default, 

1044 uses the current owner and group. 

1045 """ 

1046 sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir) 

1047 save_cwd = os.getcwd() 

1048 if root_dir is not None: 

1049 if logger is not None: 

1050 logger.debug("changing into '%s'", root_dir) 

1051 base_name = os.path.abspath(base_name) 

1052 if not dry_run: 

1053 os.chdir(root_dir) 

1054 

1055 if base_dir is None: 

1056 base_dir = os.curdir 

1057 

1058 kwargs = {'dry_run': dry_run, 'logger': logger} 

1059 

1060 try: 

1061 format_info = _ARCHIVE_FORMATS[format] 

1062 except KeyError: 

1063 raise ValueError("unknown archive format '%s'" % format) from None 

1064 

1065 func = format_info[0] 

1066 for arg, val in format_info[1]: 

1067 kwargs[arg] = val 

1068 

1069 if format != 'zip': 

1070 kwargs['owner'] = owner 

1071 kwargs['group'] = group 

1072 

1073 try: 

1074 filename = func(base_name, base_dir, **kwargs) 

1075 finally: 

1076 if root_dir is not None: 

1077 if logger is not None: 

1078 logger.debug("changing back to '%s'", save_cwd) 

1079 os.chdir(save_cwd) 

1080 

1081 return filename 

1082 

1083 

1084def get_unpack_formats(): 

1085 """Returns a list of supported formats for unpacking. 

1086 

1087 Each element of the returned sequence is a tuple 

1088 (name, extensions, description) 

1089 """ 

1090 formats = [(name, info[0], info[3]) for name, info in 

1091 _UNPACK_FORMATS.items()] 

1092 formats.sort() 

1093 return formats 

1094 

1095def _check_unpack_options(extensions, function, extra_args): 

1096 """Checks what gets registered as an unpacker.""" 

1097 # first make sure no other unpacker is registered for this extension 

1098 existing_extensions = {} 

1099 for name, info in _UNPACK_FORMATS.items(): 

1100 for ext in info[0]: 

1101 existing_extensions[ext] = name 

1102 

1103 for extension in extensions: 

1104 if extension in existing_extensions: 

1105 msg = '%s is already registered for "%s"' 

1106 raise RegistryError(msg % (extension, 

1107 existing_extensions[extension])) 

1108 

1109 if not callable(function): 

1110 raise TypeError('The registered function must be a callable') 

1111 

1112 

1113def register_unpack_format(name, extensions, function, extra_args=None, 

1114 description=''): 

1115 """Registers an unpack format. 

1116 

1117 `name` is the name of the format. `extensions` is a list of extensions 

1118 corresponding to the format. 

1119 

1120 `function` is the callable that will be 

1121 used to unpack archives. The callable will receive archives to unpack. 

1122 If it's unable to handle an archive, it needs to raise a ReadError 

1123 exception. 

1124 

1125 If provided, `extra_args` is a sequence of 

1126 (name, value) tuples that will be passed as arguments to the callable. 

1127 description can be provided to describe the format, and will be returned 

1128 by the get_unpack_formats() function. 

1129 """ 

1130 if extra_args is None: 

1131 extra_args = [] 

1132 _check_unpack_options(extensions, function, extra_args) 

1133 _UNPACK_FORMATS[name] = extensions, function, extra_args, description 

1134 

1135def unregister_unpack_format(name): 

1136 """Removes the pack format from the registry.""" 

1137 del _UNPACK_FORMATS[name] 

1138 

1139def _ensure_directory(path): 

1140 """Ensure that the parent directory of `path` exists""" 

1141 dirname = os.path.dirname(path) 

1142 if not os.path.isdir(dirname): 

1143 os.makedirs(dirname) 

1144 

1145def _unpack_zipfile(filename, extract_dir): 

1146 """Unpack zip `filename` to `extract_dir` 

1147 """ 

1148 import zipfile # late import for breaking circular dependency 

1149 

1150 if not zipfile.is_zipfile(filename): 

1151 raise ReadError("%s is not a zip file" % filename) 

1152 

1153 zip = zipfile.ZipFile(filename) 

1154 try: 

1155 for info in zip.infolist(): 

1156 name = info.filename 

1157 

1158 # don't extract absolute paths or ones with .. in them 

1159 if name.startswith('/') or '..' in name: 

1160 continue 

1161 

1162 target = os.path.join(extract_dir, *name.split('/')) 

1163 if not target: 

1164 continue 

1165 

1166 _ensure_directory(target) 

1167 if not name.endswith('/'): 

1168 # file 

1169 data = zip.read(info.filename) 

1170 f = open(target, 'wb') 

1171 try: 

1172 f.write(data) 

1173 finally: 

1174 f.close() 

1175 del data 

1176 finally: 

1177 zip.close() 

1178 

1179def _unpack_tarfile(filename, extract_dir): 

1180 """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir` 

1181 """ 

1182 import tarfile # late import for breaking circular dependency 

1183 try: 

1184 tarobj = tarfile.open(filename) 

1185 except tarfile.TarError: 

1186 raise ReadError( 

1187 "%s is not a compressed or uncompressed tar file" % filename) 

1188 try: 

1189 tarobj.extractall(extract_dir) 

1190 finally: 

1191 tarobj.close() 

1192 

1193_UNPACK_FORMATS = { 

1194 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"), 

1195 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"), 

1196} 

1197 

1198if _ZLIB_SUPPORTED: 

1199 _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [], 

1200 "gzip'ed tar-file") 

1201 

1202if _BZ2_SUPPORTED: 

1203 _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [], 

1204 "bzip2'ed tar-file") 

1205 

1206if _LZMA_SUPPORTED: 

1207 _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [], 

1208 "xz'ed tar-file") 

1209 

1210def _find_unpack_format(filename): 

1211 for name, info in _UNPACK_FORMATS.items(): 

1212 for extension in info[0]: 

1213 if filename.endswith(extension): 

1214 return name 

1215 return None 

1216 

1217def unpack_archive(filename, extract_dir=None, format=None): 

1218 """Unpack an archive. 

1219 

1220 `filename` is the name of the archive. 

1221 

1222 `extract_dir` is the name of the target directory, where the archive 

1223 is unpacked. If not provided, the current working directory is used. 

1224 

1225 `format` is the archive format: one of "zip", "tar", "gztar", "bztar", 

1226 or "xztar". Or any other registered format. If not provided, 

1227 unpack_archive will use the filename extension and see if an unpacker 

1228 was registered for that extension. 

1229 

1230 In case none is found, a ValueError is raised. 

1231 """ 

1232 sys.audit("shutil.unpack_archive", filename, extract_dir, format) 

1233 

1234 if extract_dir is None: 

1235 extract_dir = os.getcwd() 

1236 

1237 extract_dir = os.fspath(extract_dir) 

1238 filename = os.fspath(filename) 

1239 

1240 if format is not None: 

1241 try: 

1242 format_info = _UNPACK_FORMATS[format] 

1243 except KeyError: 

1244 raise ValueError("Unknown unpack format '{0}'".format(format)) from None 

1245 

1246 func = format_info[1] 

1247 func(filename, extract_dir, **dict(format_info[2])) 

1248 else: 

1249 # we need to look at the registered unpackers supported extensions 

1250 format = _find_unpack_format(filename) 

1251 if format is None: 

1252 raise ReadError("Unknown archive format '{0}'".format(filename)) 

1253 

1254 func = _UNPACK_FORMATS[format][1] 

1255 kwargs = dict(_UNPACK_FORMATS[format][2]) 

1256 func(filename, extract_dir, **kwargs) 

1257 

1258 

1259if hasattr(os, 'statvfs'): 

1260 

1261 __all__.append('disk_usage') 

1262 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 

1263 _ntuple_diskusage.total.__doc__ = 'Total space in bytes' 

1264 _ntuple_diskusage.used.__doc__ = 'Used space in bytes' 

1265 _ntuple_diskusage.free.__doc__ = 'Free space in bytes' 

1266 

1267 def disk_usage(path): 

1268 """Return disk usage statistics about the given path. 

1269 

1270 Returned value is a named tuple with attributes 'total', 'used' and 

1271 'free', which are the amount of total, used and free space, in bytes. 

1272 """ 

1273 st = os.statvfs(path) 

1274 free = st.f_bavail * st.f_frsize 

1275 total = st.f_blocks * st.f_frsize 

1276 used = (st.f_blocks - st.f_bfree) * st.f_frsize 

1277 return _ntuple_diskusage(total, used, free) 

1278 

1279elif _WINDOWS: 

1280 

1281 __all__.append('disk_usage') 

1282 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 

1283 

1284 def disk_usage(path): 

1285 """Return disk usage statistics about the given path. 

1286 

1287 Returned values is a named tuple with attributes 'total', 'used' and 

1288 'free', which are the amount of total, used and free space, in bytes. 

1289 """ 

1290 total, free = nt._getdiskusage(path) 

1291 used = total - free 

1292 return _ntuple_diskusage(total, used, free) 

1293 

1294 

1295def chown(path, user=None, group=None): 

1296 """Change owner user and group of the given path. 

1297 

1298 user and group can be the uid/gid or the user/group names, and in that case, 

1299 they are converted to their respective uid/gid. 

1300 """ 

1301 sys.audit('shutil.chown', path, user, group) 

1302 

1303 if user is None and group is None: 

1304 raise ValueError("user and/or group must be set") 

1305 

1306 _user = user 

1307 _group = group 

1308 

1309 # -1 means don't change it 

1310 if user is None: 

1311 _user = -1 

1312 # user can either be an int (the uid) or a string (the system username) 

1313 elif isinstance(user, str): 

1314 _user = _get_uid(user) 

1315 if _user is None: 

1316 raise LookupError("no such user: {!r}".format(user)) 

1317 

1318 if group is None: 

1319 _group = -1 

1320 elif not isinstance(group, int): 

1321 _group = _get_gid(group) 

1322 if _group is None: 

1323 raise LookupError("no such group: {!r}".format(group)) 

1324 

1325 os.chown(path, _user, _group) 

1326 

1327def get_terminal_size(fallback=(80, 24)): 

1328 """Get the size of the terminal window. 

1329 

1330 For each of the two dimensions, the environment variable, COLUMNS 

1331 and LINES respectively, is checked. If the variable is defined and 

1332 the value is a positive integer, it is used. 

1333 

1334 When COLUMNS or LINES is not defined, which is the common case, 

1335 the terminal connected to sys.__stdout__ is queried 

1336 by invoking os.get_terminal_size. 

1337 

1338 If the terminal size cannot be successfully queried, either because 

1339 the system doesn't support querying, or because we are not 

1340 connected to a terminal, the value given in fallback parameter 

1341 is used. Fallback defaults to (80, 24) which is the default 

1342 size used by many terminal emulators. 

1343 

1344 The value returned is a named tuple of type os.terminal_size. 

1345 """ 

1346 # columns, lines are the working values 

1347 try: 

1348 columns = int(os.environ['COLUMNS']) 

1349 except (KeyError, ValueError): 

1350 columns = 0 

1351 

1352 try: 

1353 lines = int(os.environ['LINES']) 

1354 except (KeyError, ValueError): 

1355 lines = 0 

1356 

1357 # only query if necessary 

1358 if columns <= 0 or lines <= 0: 

1359 try: 

1360 size = os.get_terminal_size(sys.__stdout__.fileno()) 

1361 except (AttributeError, ValueError, OSError): 

1362 # stdout is None, closed, detached, or not a terminal, or 

1363 # os.get_terminal_size() is unsupported 

1364 size = os.terminal_size(fallback) 

1365 if columns <= 0: 

1366 columns = size.columns 

1367 if lines <= 0: 

1368 lines = size.lines 

1369 

1370 return os.terminal_size((columns, lines)) 

1371 

1372 

1373# Check that a given file can be accessed with the correct mode. 

1374# Additionally check that `file` is not a directory, as on Windows 

1375# directories pass the os.access check. 

1376def _access_check(fn, mode): 

1377 return (os.path.exists(fn) and os.access(fn, mode) 

1378 and not os.path.isdir(fn)) 

1379 

1380 

1381def which(cmd, mode=os.F_OK | os.X_OK, path=None): 

1382 """Given a command, mode, and a PATH string, return the path which 

1383 conforms to the given mode on the PATH, or None if there is no such 

1384 file. 

1385 

1386 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result 

1387 of os.environ.get("PATH"), or can be overridden with a custom search 

1388 path. 

1389 

1390 """ 

1391 # If we're given a path with a directory part, look it up directly rather 

1392 # than referring to PATH directories. This includes checking relative to the 

1393 # current directory, e.g. ./script 

1394 if os.path.dirname(cmd): 

1395 if _access_check(cmd, mode): 

1396 return cmd 

1397 return None 

1398 

1399 use_bytes = isinstance(cmd, bytes) 

1400 

1401 if path is None: 

1402 path = os.environ.get("PATH", None) 

1403 if path is None: 

1404 try: 

1405 path = os.confstr("CS_PATH") 

1406 except (AttributeError, ValueError): 

1407 # os.confstr() or CS_PATH is not available 

1408 path = os.defpath 

1409 # bpo-35755: Don't use os.defpath if the PATH environment variable is 

1410 # set to an empty string 

1411 

1412 # PATH='' doesn't match, whereas PATH=':' looks in the current directory 

1413 if not path: 

1414 return None 

1415 

1416 if use_bytes: 

1417 path = os.fsencode(path) 

1418 path = path.split(os.fsencode(os.pathsep)) 

1419 else: 

1420 path = os.fsdecode(path) 

1421 path = path.split(os.pathsep) 

1422 

1423 if sys.platform == "win32": 

1424 # The current directory takes precedence on Windows. 

1425 curdir = os.curdir 

1426 if use_bytes: 

1427 curdir = os.fsencode(curdir) 

1428 if curdir not in path: 

1429 path.insert(0, curdir) 

1430 

1431 # PATHEXT is necessary to check on Windows. 

1432 pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT 

1433 pathext = [ext for ext in pathext_source.split(os.pathsep) if ext] 

1434 

1435 if use_bytes: 

1436 pathext = [os.fsencode(ext) for ext in pathext] 

1437 # See if the given file matches any of the expected path extensions. 

1438 # This will allow us to short circuit when given "python.exe". 

1439 # If it does match, only test that one, otherwise we have to try 

1440 # others. 

1441 if any(cmd.lower().endswith(ext.lower()) for ext in pathext): 

1442 files = [cmd] 

1443 else: 

1444 files = [cmd + ext for ext in pathext] 

1445 else: 

1446 # On other platforms you don't have things like PATHEXT to tell you 

1447 # what file suffixes are executable, so just pass on cmd as-is. 

1448 files = [cmd] 

1449 

1450 seen = set() 

1451 for dir in path: 

1452 normdir = os.path.normcase(dir) 

1453 if not normdir in seen: 

1454 seen.add(normdir) 

1455 for thefile in files: 

1456 name = os.path.join(dir, thefile) 

1457 if _access_check(name, mode): 

1458 return name 

1459 return None