Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/shutil.py: 6%

787 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1"""Utility functions for copying and archiving files and directory trees. 

2 

3XXX The functions here don't copy the resource fork or other metadata on Mac. 

4 

5""" 

6 

7import os 

8import sys 

9import stat 

10import fnmatch 

11import collections 

12import errno 

13 

14try: 

15 import zlib 

16 del zlib 

17 _ZLIB_SUPPORTED = True 

18except ImportError: 

19 _ZLIB_SUPPORTED = False 

20 

21try: 

22 import bz2 

23 del bz2 

24 _BZ2_SUPPORTED = True 

25except ImportError: 

26 _BZ2_SUPPORTED = False 

27 

28try: 

29 import lzma 

30 del lzma 

31 _LZMA_SUPPORTED = True 

32except ImportError: 

33 _LZMA_SUPPORTED = False 

34 

35try: 

36 from pwd import getpwnam 

37except ImportError: 

38 getpwnam = None 

39 

40try: 

41 from grp import getgrnam 

42except ImportError: 

43 getgrnam = None 

44 

45_WINDOWS = os.name == 'nt' 

46posix = nt = None 

47if os.name == 'posix': 

48 import posix 

49elif _WINDOWS: 

50 import nt 

51 

52COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024 

53_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux") 

54_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS 

55 

56__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2", 

57 "copytree", "move", "rmtree", "Error", "SpecialFileError", 

58 "ExecError", "make_archive", "get_archive_formats", 

59 "register_archive_format", "unregister_archive_format", 

60 "get_unpack_formats", "register_unpack_format", 

61 "unregister_unpack_format", "unpack_archive", 

62 "ignore_patterns", "chown", "which", "get_terminal_size", 

63 "SameFileError"] 

64 # disk_usage is added later, if available on the platform 

65 

66class Error(OSError): 

67 pass 

68 

69class SameFileError(Error): 

70 """Raised when source and destination are the same file.""" 

71 

72class SpecialFileError(OSError): 

73 """Raised when trying to do a kind of operation (e.g. copying) which is 

74 not supported on a special file (e.g. a named pipe)""" 

75 

76class ExecError(OSError): 

77 """Raised when a command could not be executed""" 

78 

79class ReadError(OSError): 

80 """Raised when an archive cannot be read""" 

81 

82class RegistryError(Exception): 

83 """Raised when a registry operation with the archiving 

84 and unpacking registries fails""" 

85 

86class _GiveupOnFastCopy(Exception): 

87 """Raised as a signal to fallback on using raw read()/write() 

88 file copy when fast-copy functions fail to do so. 

89 """ 

90 

91def _fastcopy_fcopyfile(fsrc, fdst, flags): 

92 """Copy a regular file content or metadata by using high-performance 

93 fcopyfile(3) syscall (macOS). 

94 """ 

95 try: 

96 infd = fsrc.fileno() 

97 outfd = fdst.fileno() 

98 except Exception as err: 

99 raise _GiveupOnFastCopy(err) # not a regular file 

100 

101 try: 

102 posix._fcopyfile(infd, outfd, flags) 

103 except OSError as err: 

104 err.filename = fsrc.name 

105 err.filename2 = fdst.name 

106 if err.errno in {errno.EINVAL, errno.ENOTSUP}: 

107 raise _GiveupOnFastCopy(err) 

108 else: 

109 raise err from None 

110 

111def _fastcopy_sendfile(fsrc, fdst): 

112 """Copy data from one regular mmap-like fd to another by using 

113 high-performance sendfile(2) syscall. 

114 This should work on Linux >= 2.6.33 only. 

115 """ 

116 # Note: copyfileobj() is left alone in order to not introduce any 

117 # unexpected breakage. Possible risks by using zero-copy calls 

118 # in copyfileobj() are: 

119 # - fdst cannot be open in "a"(ppend) mode 

120 # - fsrc and fdst may be open in "t"(ext) mode 

121 # - fsrc may be a BufferedReader (which hides unread data in a buffer), 

122 # GzipFile (which decompresses data), HTTPResponse (which decodes 

123 # chunks). 

124 # - possibly others (e.g. encrypted fs/partition?) 

125 global _USE_CP_SENDFILE 

126 try: 

127 infd = fsrc.fileno() 

128 outfd = fdst.fileno() 

129 except Exception as err: 

130 raise _GiveupOnFastCopy(err) # not a regular file 

131 

132 # Hopefully the whole file will be copied in a single call. 

133 # sendfile() is called in a loop 'till EOF is reached (0 return) 

134 # so a bufsize smaller or bigger than the actual file size 

135 # should not make any difference, also in case the file content 

136 # changes while being copied. 

137 try: 

138 blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB 

139 except OSError: 

140 blocksize = 2 ** 27 # 128MiB 

141 # On 32-bit architectures truncate to 1GiB to avoid OverflowError, 

142 # see bpo-38319. 

143 if sys.maxsize < 2 ** 32: 

144 blocksize = min(blocksize, 2 ** 30) 

145 

146 offset = 0 

147 while True: 

148 try: 

149 sent = os.sendfile(outfd, infd, offset, blocksize) 

150 except OSError as err: 

151 # ...in oder to have a more informative exception. 

152 err.filename = fsrc.name 

153 err.filename2 = fdst.name 

154 

155 if err.errno == errno.ENOTSOCK: 

156 # sendfile() on this platform (probably Linux < 2.6.33) 

157 # does not support copies between regular files (only 

158 # sockets). 

159 _USE_CP_SENDFILE = False 

160 raise _GiveupOnFastCopy(err) 

161 

162 if err.errno == errno.ENOSPC: # filesystem is full 

163 raise err from None 

164 

165 # Give up on first call and if no data was copied. 

166 if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0: 

167 raise _GiveupOnFastCopy(err) 

168 

169 raise err 

170 else: 

171 if sent == 0: 

172 break # EOF 

173 offset += sent 

174 

175def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE): 

176 """readinto()/memoryview() based variant of copyfileobj(). 

177 *fsrc* must support readinto() method and both files must be 

178 open in binary mode. 

179 """ 

180 # Localize variable access to minimize overhead. 

181 fsrc_readinto = fsrc.readinto 

182 fdst_write = fdst.write 

183 with memoryview(bytearray(length)) as mv: 

184 while True: 

185 n = fsrc_readinto(mv) 

186 if not n: 

187 break 

188 elif n < length: 

189 with mv[:n] as smv: 

190 fdst.write(smv) 

191 else: 

192 fdst_write(mv) 

193 

194def copyfileobj(fsrc, fdst, length=0): 

195 """copy data from file-like object fsrc to file-like object fdst""" 

196 # Localize variable access to minimize overhead. 

197 if not length: 

198 length = COPY_BUFSIZE 

199 fsrc_read = fsrc.read 

200 fdst_write = fdst.write 

201 while True: 

202 buf = fsrc_read(length) 

203 if not buf: 

204 break 

205 fdst_write(buf) 

206 

207def _samefile(src, dst): 

208 # Macintosh, Unix. 

209 if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'): 

210 try: 

211 return os.path.samestat(src.stat(), os.stat(dst)) 

212 except OSError: 

213 return False 

214 

215 if hasattr(os.path, 'samefile'): 

216 try: 

217 return os.path.samefile(src, dst) 

218 except OSError: 

219 return False 

220 

221 # All other platforms: check for same pathname. 

222 return (os.path.normcase(os.path.abspath(src)) == 

223 os.path.normcase(os.path.abspath(dst))) 

224 

225def _stat(fn): 

226 return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn) 

227 

228def _islink(fn): 

229 return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn) 

230 

231def copyfile(src, dst, *, follow_symlinks=True): 

232 """Copy data from src to dst in the most efficient way possible. 

233 

234 If follow_symlinks is not set and src is a symbolic link, a new 

235 symlink will be created instead of copying the file it points to. 

236 

237 """ 

238 sys.audit("shutil.copyfile", src, dst) 

239 

240 if _samefile(src, dst): 

241 raise SameFileError("{!r} and {!r} are the same file".format(src, dst)) 

242 

243 file_size = 0 

244 for i, fn in enumerate([src, dst]): 

245 try: 

246 st = _stat(fn) 

247 except OSError: 

248 # File most likely does not exist 

249 pass 

250 else: 

251 # XXX What about other special files? (sockets, devices...) 

252 if stat.S_ISFIFO(st.st_mode): 

253 fn = fn.path if isinstance(fn, os.DirEntry) else fn 

254 raise SpecialFileError("`%s` is a named pipe" % fn) 

255 if _WINDOWS and i == 0: 

256 file_size = st.st_size 

257 

258 if not follow_symlinks and _islink(src): 

259 os.symlink(os.readlink(src), dst) 

260 else: 

261 with open(src, 'rb') as fsrc, open(dst, 'wb') as fdst: 

262 # macOS 

263 if _HAS_FCOPYFILE: 

264 try: 

265 _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) 

266 return dst 

267 except _GiveupOnFastCopy: 

268 pass 

269 # Linux 

270 elif _USE_CP_SENDFILE: 

271 try: 

272 _fastcopy_sendfile(fsrc, fdst) 

273 return dst 

274 except _GiveupOnFastCopy: 

275 pass 

276 # Windows, see: 

277 # https://github.com/python/cpython/pull/7160#discussion_r195405230 

278 elif _WINDOWS and file_size > 0: 

279 _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE)) 

280 return dst 

281 

282 copyfileobj(fsrc, fdst) 

283 

284 return dst 

285 

286def copymode(src, dst, *, follow_symlinks=True): 

287 """Copy mode bits from src to dst. 

288 

289 If follow_symlinks is not set, symlinks aren't followed if and only 

290 if both `src` and `dst` are symlinks. If `lchmod` isn't available 

291 (e.g. Linux) this method does nothing. 

292 

293 """ 

294 sys.audit("shutil.copymode", src, dst) 

295 

296 if not follow_symlinks and _islink(src) and os.path.islink(dst): 

297 if hasattr(os, 'lchmod'): 

298 stat_func, chmod_func = os.lstat, os.lchmod 

299 else: 

300 return 

301 else: 

302 stat_func, chmod_func = _stat, os.chmod 

303 

304 st = stat_func(src) 

305 chmod_func(dst, stat.S_IMODE(st.st_mode)) 

306 

307if hasattr(os, 'listxattr'): 

308 def _copyxattr(src, dst, *, follow_symlinks=True): 

309 """Copy extended filesystem attributes from `src` to `dst`. 

310 

311 Overwrite existing attributes. 

312 

313 If `follow_symlinks` is false, symlinks won't be followed. 

314 

315 """ 

316 

317 try: 

318 names = os.listxattr(src, follow_symlinks=follow_symlinks) 

319 except OSError as e: 

320 if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL): 

321 raise 

322 return 

323 for name in names: 

324 try: 

325 value = os.getxattr(src, name, follow_symlinks=follow_symlinks) 

326 os.setxattr(dst, name, value, follow_symlinks=follow_symlinks) 

327 except OSError as e: 

328 if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA, 

329 errno.EINVAL): 

330 raise 

331else: 

332 def _copyxattr(*args, **kwargs): 

333 pass 

334 

335def copystat(src, dst, *, follow_symlinks=True): 

336 """Copy file metadata 

337 

338 Copy the permission bits, last access time, last modification time, and 

339 flags from `src` to `dst`. On Linux, copystat() also copies the "extended 

340 attributes" where possible. The file contents, owner, and group are 

341 unaffected. `src` and `dst` are path-like objects or path names given as 

342 strings. 

343 

344 If the optional flag `follow_symlinks` is not set, symlinks aren't 

345 followed if and only if both `src` and `dst` are symlinks. 

346 """ 

347 sys.audit("shutil.copystat", src, dst) 

348 

349 def _nop(*args, ns=None, follow_symlinks=None): 

350 pass 

351 

352 # follow symlinks (aka don't not follow symlinks) 

353 follow = follow_symlinks or not (_islink(src) and os.path.islink(dst)) 

354 if follow: 

355 # use the real function if it exists 

356 def lookup(name): 

357 return getattr(os, name, _nop) 

358 else: 

359 # use the real function only if it exists 

360 # *and* it supports follow_symlinks 

361 def lookup(name): 

362 fn = getattr(os, name, _nop) 

363 if fn in os.supports_follow_symlinks: 

364 return fn 

365 return _nop 

366 

367 if isinstance(src, os.DirEntry): 

368 st = src.stat(follow_symlinks=follow) 

369 else: 

370 st = lookup("stat")(src, follow_symlinks=follow) 

371 mode = stat.S_IMODE(st.st_mode) 

372 lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns), 

373 follow_symlinks=follow) 

374 # We must copy extended attributes before the file is (potentially) 

375 # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. 

376 _copyxattr(src, dst, follow_symlinks=follow) 

377 try: 

378 lookup("chmod")(dst, mode, follow_symlinks=follow) 

379 except NotImplementedError: 

380 # if we got a NotImplementedError, it's because 

381 # * follow_symlinks=False, 

382 # * lchown() is unavailable, and 

383 # * either 

384 # * fchownat() is unavailable or 

385 # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. 

386 # (it returned ENOSUP.) 

387 # therefore we're out of options--we simply cannot chown the 

388 # symlink. give up, suppress the error. 

389 # (which is what shutil always did in this circumstance.) 

390 pass 

391 if hasattr(st, 'st_flags'): 

392 try: 

393 lookup("chflags")(dst, st.st_flags, follow_symlinks=follow) 

394 except OSError as why: 

395 for err in 'EOPNOTSUPP', 'ENOTSUP': 

396 if hasattr(errno, err) and why.errno == getattr(errno, err): 

397 break 

398 else: 

399 raise 

400 

401def copy(src, dst, *, follow_symlinks=True): 

402 """Copy data and mode bits ("cp src dst"). Return the file's destination. 

403 

404 The destination may be a directory. 

405 

406 If follow_symlinks is false, symlinks won't be followed. This 

407 resembles GNU's "cp -P src dst". 

408 

409 If source and destination are the same file, a SameFileError will be 

410 raised. 

411 

412 """ 

413 if os.path.isdir(dst): 

414 dst = os.path.join(dst, os.path.basename(src)) 

415 copyfile(src, dst, follow_symlinks=follow_symlinks) 

416 copymode(src, dst, follow_symlinks=follow_symlinks) 

417 return dst 

418 

419def copy2(src, dst, *, follow_symlinks=True): 

420 """Copy data and metadata. Return the file's destination. 

421 

422 Metadata is copied with copystat(). Please see the copystat function 

423 for more information. 

424 

425 The destination may be a directory. 

426 

427 If follow_symlinks is false, symlinks won't be followed. This 

428 resembles GNU's "cp -P src dst". 

429 """ 

430 if os.path.isdir(dst): 

431 dst = os.path.join(dst, os.path.basename(src)) 

432 copyfile(src, dst, follow_symlinks=follow_symlinks) 

433 copystat(src, dst, follow_symlinks=follow_symlinks) 

434 return dst 

435 

436def ignore_patterns(*patterns): 

437 """Function that can be used as copytree() ignore parameter. 

438 

439 Patterns is a sequence of glob-style patterns 

440 that are used to exclude files""" 

441 def _ignore_patterns(path, names): 

442 ignored_names = [] 

443 for pattern in patterns: 

444 ignored_names.extend(fnmatch.filter(names, pattern)) 

445 return set(ignored_names) 

446 return _ignore_patterns 

447 

448def _copytree(entries, src, dst, symlinks, ignore, copy_function, 

449 ignore_dangling_symlinks, dirs_exist_ok=False): 

450 if ignore is not None: 

451 ignored_names = ignore(os.fspath(src), [x.name for x in entries]) 

452 else: 

453 ignored_names = set() 

454 

455 os.makedirs(dst, exist_ok=dirs_exist_ok) 

456 errors = [] 

457 use_srcentry = copy_function is copy2 or copy_function is copy 

458 

459 for srcentry in entries: 

460 if srcentry.name in ignored_names: 

461 continue 

462 srcname = os.path.join(src, srcentry.name) 

463 dstname = os.path.join(dst, srcentry.name) 

464 srcobj = srcentry if use_srcentry else srcname 

465 try: 

466 is_symlink = srcentry.is_symlink() 

467 if is_symlink and os.name == 'nt': 

468 # Special check for directory junctions, which appear as 

469 # symlinks but we want to recurse. 

470 lstat = srcentry.stat(follow_symlinks=False) 

471 if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT: 

472 is_symlink = False 

473 if is_symlink: 

474 linkto = os.readlink(srcname) 

475 if symlinks: 

476 # We can't just leave it to `copy_function` because legacy 

477 # code with a custom `copy_function` may rely on copytree 

478 # doing the right thing. 

479 os.symlink(linkto, dstname) 

480 copystat(srcobj, dstname, follow_symlinks=not symlinks) 

481 else: 

482 # ignore dangling symlink if the flag is on 

483 if not os.path.exists(linkto) and ignore_dangling_symlinks: 

484 continue 

485 # otherwise let the copy occur. copy2 will raise an error 

486 if srcentry.is_dir(): 

487 copytree(srcobj, dstname, symlinks, ignore, 

488 copy_function, dirs_exist_ok=dirs_exist_ok) 

489 else: 

490 copy_function(srcobj, dstname) 

491 elif srcentry.is_dir(): 

492 copytree(srcobj, dstname, symlinks, ignore, copy_function, 

493 dirs_exist_ok=dirs_exist_ok) 

494 else: 

495 # Will raise a SpecialFileError for unsupported file types 

496 copy_function(srcobj, dstname) 

497 # catch the Error from the recursive copytree so that we can 

498 # continue with other files 

499 except Error as err: 

500 errors.extend(err.args[0]) 

501 except OSError as why: 

502 errors.append((srcname, dstname, str(why))) 

503 try: 

504 copystat(src, dst) 

505 except OSError as why: 

506 # Copying file access times may fail on Windows 

507 if getattr(why, 'winerror', None) is None: 

508 errors.append((src, dst, str(why))) 

509 if errors: 

510 raise Error(errors) 

511 return dst 

512 

513def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2, 

514 ignore_dangling_symlinks=False, dirs_exist_ok=False): 

515 """Recursively copy a directory tree and return the destination directory. 

516 

517 dirs_exist_ok dictates whether to raise an exception in case dst or any 

518 missing parent directory already exists. 

519 

520 If exception(s) occur, an Error is raised with a list of reasons. 

521 

522 If the optional symlinks flag is true, symbolic links in the 

523 source tree result in symbolic links in the destination tree; if 

524 it is false, the contents of the files pointed to by symbolic 

525 links are copied. If the file pointed by the symlink doesn't 

526 exist, an exception will be added in the list of errors raised in 

527 an Error exception at the end of the copy process. 

528 

529 You can set the optional ignore_dangling_symlinks flag to true if you 

530 want to silence this exception. Notice that this has no effect on 

531 platforms that don't support os.symlink. 

532 

533 The optional ignore argument is a callable. If given, it 

534 is called with the `src` parameter, which is the directory 

535 being visited by copytree(), and `names` which is the list of 

536 `src` contents, as returned by os.listdir(): 

537 

538 callable(src, names) -> ignored_names 

539 

540 Since copytree() is called recursively, the callable will be 

541 called once for each directory that is copied. It returns a 

542 list of names relative to the `src` directory that should 

543 not be copied. 

544 

545 The optional copy_function argument is a callable that will be used 

546 to copy each file. It will be called with the source path and the 

547 destination path as arguments. By default, copy2() is used, but any 

548 function that supports the same signature (like copy()) can be used. 

549 

550 """ 

551 sys.audit("shutil.copytree", src, dst) 

552 with os.scandir(src) as itr: 

553 entries = list(itr) 

554 return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks, 

555 ignore=ignore, copy_function=copy_function, 

556 ignore_dangling_symlinks=ignore_dangling_symlinks, 

557 dirs_exist_ok=dirs_exist_ok) 

558 

559if hasattr(os.stat_result, 'st_file_attributes'): 

560 # Special handling for directory junctions to make them behave like 

561 # symlinks for shutil.rmtree, since in general they do not appear as 

562 # regular links. 

563 def _rmtree_isdir(entry): 

564 try: 

565 st = entry.stat(follow_symlinks=False) 

566 return (stat.S_ISDIR(st.st_mode) and not 

567 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT 

568 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) 

569 except OSError: 

570 return False 

571 

572 def _rmtree_islink(path): 

573 try: 

574 st = os.lstat(path) 

575 return (stat.S_ISLNK(st.st_mode) or 

576 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT 

577 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) 

578 except OSError: 

579 return False 

580else: 

581 def _rmtree_isdir(entry): 

582 try: 

583 return entry.is_dir(follow_symlinks=False) 

584 except OSError: 

585 return False 

586 

587 def _rmtree_islink(path): 

588 return os.path.islink(path) 

589 

590# version vulnerable to race conditions 

591def _rmtree_unsafe(path, onerror): 

592 try: 

593 with os.scandir(path) as scandir_it: 

594 entries = list(scandir_it) 

595 except OSError: 

596 onerror(os.scandir, path, sys.exc_info()) 

597 entries = [] 

598 for entry in entries: 

599 fullname = entry.path 

600 if _rmtree_isdir(entry): 

601 try: 

602 if entry.is_symlink(): 

603 # This can only happen if someone replaces 

604 # a directory with a symlink after the call to 

605 # os.scandir or entry.is_dir above. 

606 raise OSError("Cannot call rmtree on a symbolic link") 

607 except OSError: 

608 onerror(os.path.islink, fullname, sys.exc_info()) 

609 continue 

610 _rmtree_unsafe(fullname, onerror) 

611 else: 

612 try: 

613 os.unlink(fullname) 

614 except OSError: 

615 onerror(os.unlink, fullname, sys.exc_info()) 

616 try: 

617 os.rmdir(path) 

618 except OSError: 

619 onerror(os.rmdir, path, sys.exc_info()) 

620 

621# Version using fd-based APIs to protect against races 

622def _rmtree_safe_fd(topfd, path, onerror): 

623 try: 

624 with os.scandir(topfd) as scandir_it: 

625 entries = list(scandir_it) 

626 except OSError as err: 

627 err.filename = path 

628 onerror(os.scandir, path, sys.exc_info()) 

629 return 

630 for entry in entries: 

631 fullname = os.path.join(path, entry.name) 

632 try: 

633 is_dir = entry.is_dir(follow_symlinks=False) 

634 except OSError: 

635 is_dir = False 

636 else: 

637 if is_dir: 

638 try: 

639 orig_st = entry.stat(follow_symlinks=False) 

640 is_dir = stat.S_ISDIR(orig_st.st_mode) 

641 except OSError: 

642 onerror(os.lstat, fullname, sys.exc_info()) 

643 continue 

644 if is_dir: 

645 try: 

646 dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd) 

647 except OSError: 

648 onerror(os.open, fullname, sys.exc_info()) 

649 else: 

650 try: 

651 if os.path.samestat(orig_st, os.fstat(dirfd)): 

652 _rmtree_safe_fd(dirfd, fullname, onerror) 

653 try: 

654 os.rmdir(entry.name, dir_fd=topfd) 

655 except OSError: 

656 onerror(os.rmdir, fullname, sys.exc_info()) 

657 else: 

658 try: 

659 # This can only happen if someone replaces 

660 # a directory with a symlink after the call to 

661 # os.scandir or stat.S_ISDIR above. 

662 raise OSError("Cannot call rmtree on a symbolic " 

663 "link") 

664 except OSError: 

665 onerror(os.path.islink, fullname, sys.exc_info()) 

666 finally: 

667 os.close(dirfd) 

668 else: 

669 try: 

670 os.unlink(entry.name, dir_fd=topfd) 

671 except OSError: 

672 onerror(os.unlink, fullname, sys.exc_info()) 

673 

674_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= 

675 os.supports_dir_fd and 

676 os.scandir in os.supports_fd and 

677 os.stat in os.supports_follow_symlinks) 

678 

679def rmtree(path, ignore_errors=False, onerror=None): 

680 """Recursively delete a directory tree. 

681 

682 If ignore_errors is set, errors are ignored; otherwise, if onerror 

683 is set, it is called to handle the error with arguments (func, 

684 path, exc_info) where func is platform and implementation dependent; 

685 path is the argument to that function that caused it to fail; and 

686 exc_info is a tuple returned by sys.exc_info(). If ignore_errors 

687 is false and onerror is None, an exception is raised. 

688 

689 """ 

690 sys.audit("shutil.rmtree", path) 

691 if ignore_errors: 

692 def onerror(*args): 

693 pass 

694 elif onerror is None: 

695 def onerror(*args): 

696 raise 

697 if _use_fd_functions: 

698 # While the unsafe rmtree works fine on bytes, the fd based does not. 

699 if isinstance(path, bytes): 

700 path = os.fsdecode(path) 

701 # Note: To guard against symlink races, we use the standard 

702 # lstat()/open()/fstat() trick. 

703 try: 

704 orig_st = os.lstat(path) 

705 except Exception: 

706 onerror(os.lstat, path, sys.exc_info()) 

707 return 

708 try: 

709 fd = os.open(path, os.O_RDONLY) 

710 except Exception: 

711 onerror(os.lstat, path, sys.exc_info()) 

712 return 

713 try: 

714 if os.path.samestat(orig_st, os.fstat(fd)): 

715 _rmtree_safe_fd(fd, path, onerror) 

716 try: 

717 os.rmdir(path) 

718 except OSError: 

719 onerror(os.rmdir, path, sys.exc_info()) 

720 else: 

721 try: 

722 # symlinks to directories are forbidden, see bug #1669 

723 raise OSError("Cannot call rmtree on a symbolic link") 

724 except OSError: 

725 onerror(os.path.islink, path, sys.exc_info()) 

726 finally: 

727 os.close(fd) 

728 else: 

729 try: 

730 if _rmtree_islink(path): 

731 # symlinks to directories are forbidden, see bug #1669 

732 raise OSError("Cannot call rmtree on a symbolic link") 

733 except OSError: 

734 onerror(os.path.islink, path, sys.exc_info()) 

735 # can't continue even if onerror hook returns 

736 return 

737 return _rmtree_unsafe(path, onerror) 

738 

739# Allow introspection of whether or not the hardening against symlink 

740# attacks is supported on the current platform 

741rmtree.avoids_symlink_attacks = _use_fd_functions 

742 

743def _basename(path): 

744 # A basename() variant which first strips the trailing slash, if present. 

745 # Thus we always get the last component of the path, even for directories. 

746 sep = os.path.sep + (os.path.altsep or '') 

747 return os.path.basename(path.rstrip(sep)) 

748 

749def move(src, dst, copy_function=copy2): 

750 """Recursively move a file or directory to another location. This is 

751 similar to the Unix "mv" command. Return the file or directory's 

752 destination. 

753 

754 If the destination is a directory or a symlink to a directory, the source 

755 is moved inside the directory. The destination path must not already 

756 exist. 

757 

758 If the destination already exists but is not a directory, it may be 

759 overwritten depending on os.rename() semantics. 

760 

761 If the destination is on our current filesystem, then rename() is used. 

762 Otherwise, src is copied to the destination and then removed. Symlinks are 

763 recreated under the new name if os.rename() fails because of cross 

764 filesystem renames. 

765 

766 The optional `copy_function` argument is a callable that will be used 

767 to copy the source or it will be delegated to `copytree`. 

768 By default, copy2() is used, but any function that supports the same 

769 signature (like copy()) can be used. 

770 

771 A lot more could be done here... A look at a mv.c shows a lot of 

772 the issues this implementation glosses over. 

773 

774 """ 

775 sys.audit("shutil.move", src, dst) 

776 real_dst = dst 

777 if os.path.isdir(dst): 

778 if _samefile(src, dst): 

779 # We might be on a case insensitive filesystem, 

780 # perform the rename anyway. 

781 os.rename(src, dst) 

782 return 

783 

784 real_dst = os.path.join(dst, _basename(src)) 

785 if os.path.exists(real_dst): 

786 raise Error("Destination path '%s' already exists" % real_dst) 

787 try: 

788 os.rename(src, real_dst) 

789 except OSError: 

790 if os.path.islink(src): 

791 linkto = os.readlink(src) 

792 os.symlink(linkto, real_dst) 

793 os.unlink(src) 

794 elif os.path.isdir(src): 

795 if _destinsrc(src, dst): 

796 raise Error("Cannot move a directory '%s' into itself" 

797 " '%s'." % (src, dst)) 

798 copytree(src, real_dst, copy_function=copy_function, 

799 symlinks=True) 

800 rmtree(src) 

801 else: 

802 copy_function(src, real_dst) 

803 os.unlink(src) 

804 return real_dst 

805 

806def _destinsrc(src, dst): 

807 src = os.path.abspath(src) 

808 dst = os.path.abspath(dst) 

809 if not src.endswith(os.path.sep): 

810 src += os.path.sep 

811 if not dst.endswith(os.path.sep): 

812 dst += os.path.sep 

813 return dst.startswith(src) 

814 

815def _get_gid(name): 

816 """Returns a gid, given a group name.""" 

817 if getgrnam is None or name is None: 

818 return None 

819 try: 

820 result = getgrnam(name) 

821 except KeyError: 

822 result = None 

823 if result is not None: 

824 return result[2] 

825 return None 

826 

827def _get_uid(name): 

828 """Returns an uid, given a user name.""" 

829 if getpwnam is None or name is None: 

830 return None 

831 try: 

832 result = getpwnam(name) 

833 except KeyError: 

834 result = None 

835 if result is not None: 

836 return result[2] 

837 return None 

838 

839def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0, 

840 owner=None, group=None, logger=None): 

841 """Create a (possibly compressed) tar file from all the files under 

842 'base_dir'. 

843 

844 'compress' must be "gzip" (the default), "bzip2", "xz", or None. 

845 

846 'owner' and 'group' can be used to define an owner and a group for the 

847 archive that is being built. If not provided, the current owner and group 

848 will be used. 

849 

850 The output tar file will be named 'base_name' + ".tar", possibly plus 

851 the appropriate compression extension (".gz", ".bz2", or ".xz"). 

852 

853 Returns the output filename. 

854 """ 

855 if compress is None: 

856 tar_compression = '' 

857 elif _ZLIB_SUPPORTED and compress == 'gzip': 

858 tar_compression = 'gz' 

859 elif _BZ2_SUPPORTED and compress == 'bzip2': 

860 tar_compression = 'bz2' 

861 elif _LZMA_SUPPORTED and compress == 'xz': 

862 tar_compression = 'xz' 

863 else: 

864 raise ValueError("bad value for 'compress', or compression format not " 

865 "supported : {0}".format(compress)) 

866 

867 import tarfile # late import for breaking circular dependency 

868 

869 compress_ext = '.' + tar_compression if compress else '' 

870 archive_name = base_name + '.tar' + compress_ext 

871 archive_dir = os.path.dirname(archive_name) 

872 

873 if archive_dir and not os.path.exists(archive_dir): 

874 if logger is not None: 

875 logger.info("creating %s", archive_dir) 

876 if not dry_run: 

877 os.makedirs(archive_dir) 

878 

879 # creating the tarball 

880 if logger is not None: 

881 logger.info('Creating tar archive') 

882 

883 uid = _get_uid(owner) 

884 gid = _get_gid(group) 

885 

886 def _set_uid_gid(tarinfo): 

887 if gid is not None: 

888 tarinfo.gid = gid 

889 tarinfo.gname = group 

890 if uid is not None: 

891 tarinfo.uid = uid 

892 tarinfo.uname = owner 

893 return tarinfo 

894 

895 if not dry_run: 

896 tar = tarfile.open(archive_name, 'w|%s' % tar_compression) 

897 try: 

898 tar.add(base_dir, filter=_set_uid_gid) 

899 finally: 

900 tar.close() 

901 

902 return archive_name 

903 

904def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None): 

905 """Create a zip file from all the files under 'base_dir'. 

906 

907 The output zip file will be named 'base_name' + ".zip". Returns the 

908 name of the output zip file. 

909 """ 

910 import zipfile # late import for breaking circular dependency 

911 

912 zip_filename = base_name + ".zip" 

913 archive_dir = os.path.dirname(base_name) 

914 

915 if archive_dir and not os.path.exists(archive_dir): 

916 if logger is not None: 

917 logger.info("creating %s", archive_dir) 

918 if not dry_run: 

919 os.makedirs(archive_dir) 

920 

921 if logger is not None: 

922 logger.info("creating '%s' and adding '%s' to it", 

923 zip_filename, base_dir) 

924 

925 if not dry_run: 

926 with zipfile.ZipFile(zip_filename, "w", 

927 compression=zipfile.ZIP_DEFLATED) as zf: 

928 path = os.path.normpath(base_dir) 

929 if path != os.curdir: 

930 zf.write(path, path) 

931 if logger is not None: 

932 logger.info("adding '%s'", path) 

933 for dirpath, dirnames, filenames in os.walk(base_dir): 

934 for name in sorted(dirnames): 

935 path = os.path.normpath(os.path.join(dirpath, name)) 

936 zf.write(path, path) 

937 if logger is not None: 

938 logger.info("adding '%s'", path) 

939 for name in filenames: 

940 path = os.path.normpath(os.path.join(dirpath, name)) 

941 if os.path.isfile(path): 

942 zf.write(path, path) 

943 if logger is not None: 

944 logger.info("adding '%s'", path) 

945 

946 return zip_filename 

947 

948_ARCHIVE_FORMATS = { 

949 'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"), 

950} 

951 

952if _ZLIB_SUPPORTED: 

953 _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')], 

954 "gzip'ed tar-file") 

955 _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file") 

956 

957if _BZ2_SUPPORTED: 

958 _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')], 

959 "bzip2'ed tar-file") 

960 

961if _LZMA_SUPPORTED: 

962 _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')], 

963 "xz'ed tar-file") 

964 

965def get_archive_formats(): 

966 """Returns a list of supported formats for archiving and unarchiving. 

967 

968 Each element of the returned sequence is a tuple (name, description) 

969 """ 

970 formats = [(name, registry[2]) for name, registry in 

971 _ARCHIVE_FORMATS.items()] 

972 formats.sort() 

973 return formats 

974 

975def register_archive_format(name, function, extra_args=None, description=''): 

976 """Registers an archive format. 

977 

978 name is the name of the format. function is the callable that will be 

979 used to create archives. If provided, extra_args is a sequence of 

980 (name, value) tuples that will be passed as arguments to the callable. 

981 description can be provided to describe the format, and will be returned 

982 by the get_archive_formats() function. 

983 """ 

984 if extra_args is None: 

985 extra_args = [] 

986 if not callable(function): 

987 raise TypeError('The %s object is not callable' % function) 

988 if not isinstance(extra_args, (tuple, list)): 

989 raise TypeError('extra_args needs to be a sequence') 

990 for element in extra_args: 

991 if not isinstance(element, (tuple, list)) or len(element) !=2: 

992 raise TypeError('extra_args elements are : (arg_name, value)') 

993 

994 _ARCHIVE_FORMATS[name] = (function, extra_args, description) 

995 

996def unregister_archive_format(name): 

997 del _ARCHIVE_FORMATS[name] 

998 

999def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, 

1000 dry_run=0, owner=None, group=None, logger=None): 

1001 """Create an archive file (eg. zip or tar). 

1002 

1003 'base_name' is the name of the file to create, minus any format-specific 

1004 extension; 'format' is the archive format: one of "zip", "tar", "gztar", 

1005 "bztar", or "xztar". Or any other registered format. 

1006 

1007 'root_dir' is a directory that will be the root directory of the 

1008 archive; ie. we typically chdir into 'root_dir' before creating the 

1009 archive. 'base_dir' is the directory where we start archiving from; 

1010 ie. 'base_dir' will be the common prefix of all files and 

1011 directories in the archive. 'root_dir' and 'base_dir' both default 

1012 to the current directory. Returns the name of the archive file. 

1013 

1014 'owner' and 'group' are used when creating a tar archive. By default, 

1015 uses the current owner and group. 

1016 """ 

1017 sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir) 

1018 save_cwd = os.getcwd() 

1019 if root_dir is not None: 

1020 if logger is not None: 

1021 logger.debug("changing into '%s'", root_dir) 

1022 base_name = os.path.abspath(base_name) 

1023 if not dry_run: 

1024 os.chdir(root_dir) 

1025 

1026 if base_dir is None: 

1027 base_dir = os.curdir 

1028 

1029 kwargs = {'dry_run': dry_run, 'logger': logger} 

1030 

1031 try: 

1032 format_info = _ARCHIVE_FORMATS[format] 

1033 except KeyError: 

1034 raise ValueError("unknown archive format '%s'" % format) from None 

1035 

1036 func = format_info[0] 

1037 for arg, val in format_info[1]: 

1038 kwargs[arg] = val 

1039 

1040 if format != 'zip': 

1041 kwargs['owner'] = owner 

1042 kwargs['group'] = group 

1043 

1044 try: 

1045 filename = func(base_name, base_dir, **kwargs) 

1046 finally: 

1047 if root_dir is not None: 

1048 if logger is not None: 

1049 logger.debug("changing back to '%s'", save_cwd) 

1050 os.chdir(save_cwd) 

1051 

1052 return filename 

1053 

1054 

1055def get_unpack_formats(): 

1056 """Returns a list of supported formats for unpacking. 

1057 

1058 Each element of the returned sequence is a tuple 

1059 (name, extensions, description) 

1060 """ 

1061 formats = [(name, info[0], info[3]) for name, info in 

1062 _UNPACK_FORMATS.items()] 

1063 formats.sort() 

1064 return formats 

1065 

1066def _check_unpack_options(extensions, function, extra_args): 

1067 """Checks what gets registered as an unpacker.""" 

1068 # first make sure no other unpacker is registered for this extension 

1069 existing_extensions = {} 

1070 for name, info in _UNPACK_FORMATS.items(): 

1071 for ext in info[0]: 

1072 existing_extensions[ext] = name 

1073 

1074 for extension in extensions: 

1075 if extension in existing_extensions: 

1076 msg = '%s is already registered for "%s"' 

1077 raise RegistryError(msg % (extension, 

1078 existing_extensions[extension])) 

1079 

1080 if not callable(function): 

1081 raise TypeError('The registered function must be a callable') 

1082 

1083 

1084def register_unpack_format(name, extensions, function, extra_args=None, 

1085 description=''): 

1086 """Registers an unpack format. 

1087 

1088 `name` is the name of the format. `extensions` is a list of extensions 

1089 corresponding to the format. 

1090 

1091 `function` is the callable that will be 

1092 used to unpack archives. The callable will receive archives to unpack. 

1093 If it's unable to handle an archive, it needs to raise a ReadError 

1094 exception. 

1095 

1096 If provided, `extra_args` is a sequence of 

1097 (name, value) tuples that will be passed as arguments to the callable. 

1098 description can be provided to describe the format, and will be returned 

1099 by the get_unpack_formats() function. 

1100 """ 

1101 if extra_args is None: 

1102 extra_args = [] 

1103 _check_unpack_options(extensions, function, extra_args) 

1104 _UNPACK_FORMATS[name] = extensions, function, extra_args, description 

1105 

1106def unregister_unpack_format(name): 

1107 """Removes the pack format from the registry.""" 

1108 del _UNPACK_FORMATS[name] 

1109 

1110def _ensure_directory(path): 

1111 """Ensure that the parent directory of `path` exists""" 

1112 dirname = os.path.dirname(path) 

1113 if not os.path.isdir(dirname): 

1114 os.makedirs(dirname) 

1115 

1116def _unpack_zipfile(filename, extract_dir): 

1117 """Unpack zip `filename` to `extract_dir` 

1118 """ 

1119 import zipfile # late import for breaking circular dependency 

1120 

1121 if not zipfile.is_zipfile(filename): 

1122 raise ReadError("%s is not a zip file" % filename) 

1123 

1124 zip = zipfile.ZipFile(filename) 

1125 try: 

1126 for info in zip.infolist(): 

1127 name = info.filename 

1128 

1129 # don't extract absolute paths or ones with .. in them 

1130 if name.startswith('/') or '..' in name: 

1131 continue 

1132 

1133 target = os.path.join(extract_dir, *name.split('/')) 

1134 if not target: 

1135 continue 

1136 

1137 _ensure_directory(target) 

1138 if not name.endswith('/'): 

1139 # file 

1140 data = zip.read(info.filename) 

1141 f = open(target, 'wb') 

1142 try: 

1143 f.write(data) 

1144 finally: 

1145 f.close() 

1146 del data 

1147 finally: 

1148 zip.close() 

1149 

1150def _unpack_tarfile(filename, extract_dir): 

1151 """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir` 

1152 """ 

1153 import tarfile # late import for breaking circular dependency 

1154 try: 

1155 tarobj = tarfile.open(filename) 

1156 except tarfile.TarError: 

1157 raise ReadError( 

1158 "%s is not a compressed or uncompressed tar file" % filename) 

1159 try: 

1160 tarobj.extractall(extract_dir) 

1161 finally: 

1162 tarobj.close() 

1163 

1164_UNPACK_FORMATS = { 

1165 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"), 

1166 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"), 

1167} 

1168 

1169if _ZLIB_SUPPORTED: 

1170 _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [], 

1171 "gzip'ed tar-file") 

1172 

1173if _BZ2_SUPPORTED: 

1174 _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [], 

1175 "bzip2'ed tar-file") 

1176 

1177if _LZMA_SUPPORTED: 

1178 _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [], 

1179 "xz'ed tar-file") 

1180 

1181def _find_unpack_format(filename): 

1182 for name, info in _UNPACK_FORMATS.items(): 

1183 for extension in info[0]: 

1184 if filename.endswith(extension): 

1185 return name 

1186 return None 

1187 

1188def unpack_archive(filename, extract_dir=None, format=None): 

1189 """Unpack an archive. 

1190 

1191 `filename` is the name of the archive. 

1192 

1193 `extract_dir` is the name of the target directory, where the archive 

1194 is unpacked. If not provided, the current working directory is used. 

1195 

1196 `format` is the archive format: one of "zip", "tar", "gztar", "bztar", 

1197 or "xztar". Or any other registered format. If not provided, 

1198 unpack_archive will use the filename extension and see if an unpacker 

1199 was registered for that extension. 

1200 

1201 In case none is found, a ValueError is raised. 

1202 """ 

1203 sys.audit("shutil.unpack_archive", filename, extract_dir, format) 

1204 

1205 if extract_dir is None: 

1206 extract_dir = os.getcwd() 

1207 

1208 extract_dir = os.fspath(extract_dir) 

1209 filename = os.fspath(filename) 

1210 

1211 if format is not None: 

1212 try: 

1213 format_info = _UNPACK_FORMATS[format] 

1214 except KeyError: 

1215 raise ValueError("Unknown unpack format '{0}'".format(format)) from None 

1216 

1217 func = format_info[1] 

1218 func(filename, extract_dir, **dict(format_info[2])) 

1219 else: 

1220 # we need to look at the registered unpackers supported extensions 

1221 format = _find_unpack_format(filename) 

1222 if format is None: 

1223 raise ReadError("Unknown archive format '{0}'".format(filename)) 

1224 

1225 func = _UNPACK_FORMATS[format][1] 

1226 kwargs = dict(_UNPACK_FORMATS[format][2]) 

1227 func(filename, extract_dir, **kwargs) 

1228 

1229 

1230if hasattr(os, 'statvfs'): 

1231 

1232 __all__.append('disk_usage') 

1233 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 

1234 _ntuple_diskusage.total.__doc__ = 'Total space in bytes' 

1235 _ntuple_diskusage.used.__doc__ = 'Used space in bytes' 

1236 _ntuple_diskusage.free.__doc__ = 'Free space in bytes' 

1237 

1238 def disk_usage(path): 

1239 """Return disk usage statistics about the given path. 

1240 

1241 Returned value is a named tuple with attributes 'total', 'used' and 

1242 'free', which are the amount of total, used and free space, in bytes. 

1243 """ 

1244 st = os.statvfs(path) 

1245 free = st.f_bavail * st.f_frsize 

1246 total = st.f_blocks * st.f_frsize 

1247 used = (st.f_blocks - st.f_bfree) * st.f_frsize 

1248 return _ntuple_diskusage(total, used, free) 

1249 

1250elif _WINDOWS: 

1251 

1252 __all__.append('disk_usage') 

1253 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 

1254 

1255 def disk_usage(path): 

1256 """Return disk usage statistics about the given path. 

1257 

1258 Returned values is a named tuple with attributes 'total', 'used' and 

1259 'free', which are the amount of total, used and free space, in bytes. 

1260 """ 

1261 total, free = nt._getdiskusage(path) 

1262 used = total - free 

1263 return _ntuple_diskusage(total, used, free) 

1264 

1265 

1266def chown(path, user=None, group=None): 

1267 """Change owner user and group of the given path. 

1268 

1269 user and group can be the uid/gid or the user/group names, and in that case, 

1270 they are converted to their respective uid/gid. 

1271 """ 

1272 sys.audit('shutil.chown', path, user, group) 

1273 

1274 if user is None and group is None: 

1275 raise ValueError("user and/or group must be set") 

1276 

1277 _user = user 

1278 _group = group 

1279 

1280 # -1 means don't change it 

1281 if user is None: 

1282 _user = -1 

1283 # user can either be an int (the uid) or a string (the system username) 

1284 elif isinstance(user, str): 

1285 _user = _get_uid(user) 

1286 if _user is None: 

1287 raise LookupError("no such user: {!r}".format(user)) 

1288 

1289 if group is None: 

1290 _group = -1 

1291 elif not isinstance(group, int): 

1292 _group = _get_gid(group) 

1293 if _group is None: 

1294 raise LookupError("no such group: {!r}".format(group)) 

1295 

1296 os.chown(path, _user, _group) 

1297 

1298def get_terminal_size(fallback=(80, 24)): 

1299 """Get the size of the terminal window. 

1300 

1301 For each of the two dimensions, the environment variable, COLUMNS 

1302 and LINES respectively, is checked. If the variable is defined and 

1303 the value is a positive integer, it is used. 

1304 

1305 When COLUMNS or LINES is not defined, which is the common case, 

1306 the terminal connected to sys.__stdout__ is queried 

1307 by invoking os.get_terminal_size. 

1308 

1309 If the terminal size cannot be successfully queried, either because 

1310 the system doesn't support querying, or because we are not 

1311 connected to a terminal, the value given in fallback parameter 

1312 is used. Fallback defaults to (80, 24) which is the default 

1313 size used by many terminal emulators. 

1314 

1315 The value returned is a named tuple of type os.terminal_size. 

1316 """ 

1317 # columns, lines are the working values 

1318 try: 

1319 columns = int(os.environ['COLUMNS']) 

1320 except (KeyError, ValueError): 

1321 columns = 0 

1322 

1323 try: 

1324 lines = int(os.environ['LINES']) 

1325 except (KeyError, ValueError): 

1326 lines = 0 

1327 

1328 # only query if necessary 

1329 if columns <= 0 or lines <= 0: 

1330 try: 

1331 size = os.get_terminal_size(sys.__stdout__.fileno()) 

1332 except (AttributeError, ValueError, OSError): 

1333 # stdout is None, closed, detached, or not a terminal, or 

1334 # os.get_terminal_size() is unsupported 

1335 size = os.terminal_size(fallback) 

1336 if columns <= 0: 

1337 columns = size.columns 

1338 if lines <= 0: 

1339 lines = size.lines 

1340 

1341 return os.terminal_size((columns, lines)) 

1342 

1343 

1344# Check that a given file can be accessed with the correct mode. 

1345# Additionally check that `file` is not a directory, as on Windows 

1346# directories pass the os.access check. 

1347def _access_check(fn, mode): 

1348 return (os.path.exists(fn) and os.access(fn, mode) 

1349 and not os.path.isdir(fn)) 

1350 

1351 

1352def which(cmd, mode=os.F_OK | os.X_OK, path=None): 

1353 """Given a command, mode, and a PATH string, return the path which 

1354 conforms to the given mode on the PATH, or None if there is no such 

1355 file. 

1356 

1357 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result 

1358 of os.environ.get("PATH"), or can be overridden with a custom search 

1359 path. 

1360 

1361 """ 

1362 # If we're given a path with a directory part, look it up directly rather 

1363 # than referring to PATH directories. This includes checking relative to the 

1364 # current directory, e.g. ./script 

1365 if os.path.dirname(cmd): 

1366 if _access_check(cmd, mode): 

1367 return cmd 

1368 return None 

1369 

1370 use_bytes = isinstance(cmd, bytes) 

1371 

1372 if path is None: 

1373 path = os.environ.get("PATH", None) 

1374 if path is None: 

1375 try: 

1376 path = os.confstr("CS_PATH") 

1377 except (AttributeError, ValueError): 

1378 # os.confstr() or CS_PATH is not available 

1379 path = os.defpath 

1380 # bpo-35755: Don't use os.defpath if the PATH environment variable is 

1381 # set to an empty string 

1382 

1383 # PATH='' doesn't match, whereas PATH=':' looks in the current directory 

1384 if not path: 

1385 return None 

1386 

1387 if use_bytes: 

1388 path = os.fsencode(path) 

1389 path = path.split(os.fsencode(os.pathsep)) 

1390 else: 

1391 path = os.fsdecode(path) 

1392 path = path.split(os.pathsep) 

1393 

1394 if sys.platform == "win32": 

1395 # The current directory takes precedence on Windows. 

1396 curdir = os.curdir 

1397 if use_bytes: 

1398 curdir = os.fsencode(curdir) 

1399 if curdir not in path: 

1400 path.insert(0, curdir) 

1401 

1402 # PATHEXT is necessary to check on Windows. 

1403 pathext = os.environ.get("PATHEXT", "").split(os.pathsep) 

1404 if use_bytes: 

1405 pathext = [os.fsencode(ext) for ext in pathext] 

1406 # See if the given file matches any of the expected path extensions. 

1407 # This will allow us to short circuit when given "python.exe". 

1408 # If it does match, only test that one, otherwise we have to try 

1409 # others. 

1410 if any(cmd.lower().endswith(ext.lower()) for ext in pathext): 

1411 files = [cmd] 

1412 else: 

1413 files = [cmd + ext for ext in pathext] 

1414 else: 

1415 # On other platforms you don't have things like PATHEXT to tell you 

1416 # what file suffixes are executable, so just pass on cmd as-is. 

1417 files = [cmd] 

1418 

1419 seen = set() 

1420 for dir in path: 

1421 normdir = os.path.normcase(dir) 

1422 if not normdir in seen: 

1423 seen.add(normdir) 

1424 for thefile in files: 

1425 name = os.path.join(dir, thefile) 

1426 if _access_check(name, mode): 

1427 return name 

1428 return None