Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/click/utils.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

235 statements  

1import os 

2import re 

3import sys 

4import typing as t 

5from functools import update_wrapper 

6from types import ModuleType 

7from types import TracebackType 

8 

9from ._compat import _default_text_stderr 

10from ._compat import _default_text_stdout 

11from ._compat import _find_binary_writer 

12from ._compat import auto_wrap_for_ansi 

13from ._compat import binary_streams 

14from ._compat import open_stream 

15from ._compat import should_strip_ansi 

16from ._compat import strip_ansi 

17from ._compat import text_streams 

18from ._compat import WIN 

19from .globals import resolve_color_default 

20 

21if t.TYPE_CHECKING: 

22 import typing_extensions as te 

23 

24 P = te.ParamSpec("P") 

25 

26R = t.TypeVar("R") 

27 

28 

29def _posixify(name: str) -> str: 

30 return "-".join(name.split()).lower() 

31 

32 

33def safecall(func: "t.Callable[P, R]") -> "t.Callable[P, t.Optional[R]]": 

34 """Wraps a function so that it swallows exceptions.""" 

35 

36 def wrapper(*args: "P.args", **kwargs: "P.kwargs") -> t.Optional[R]: 

37 try: 

38 return func(*args, **kwargs) 

39 except Exception: 

40 pass 

41 return None 

42 

43 return update_wrapper(wrapper, func) 

44 

45 

46def make_str(value: t.Any) -> str: 

47 """Converts a value into a valid string.""" 

48 if isinstance(value, bytes): 

49 try: 

50 return value.decode(sys.getfilesystemencoding()) 

51 except UnicodeError: 

52 return value.decode("utf-8", "replace") 

53 return str(value) 

54 

55 

56def make_default_short_help(help: str, max_length: int = 45) -> str: 

57 """Returns a condensed version of help string.""" 

58 # Consider only the first paragraph. 

59 paragraph_end = help.find("\n\n") 

60 

61 if paragraph_end != -1: 

62 help = help[:paragraph_end] 

63 

64 # Collapse newlines, tabs, and spaces. 

65 words = help.split() 

66 

67 if not words: 

68 return "" 

69 

70 # The first paragraph started with a "no rewrap" marker, ignore it. 

71 if words[0] == "\b": 

72 words = words[1:] 

73 

74 total_length = 0 

75 last_index = len(words) - 1 

76 

77 for i, word in enumerate(words): 

78 total_length += len(word) + (i > 0) 

79 

80 if total_length > max_length: # too long, truncate 

81 break 

82 

83 if word[-1] == ".": # sentence end, truncate without "..." 

84 return " ".join(words[: i + 1]) 

85 

86 if total_length == max_length and i != last_index: 

87 break # not at sentence end, truncate with "..." 

88 else: 

89 return " ".join(words) # no truncation needed 

90 

91 # Account for the length of the suffix. 

92 total_length += len("...") 

93 

94 # remove words until the length is short enough 

95 while i > 0: 

96 total_length -= len(words[i]) + (i > 0) 

97 

98 if total_length <= max_length: 

99 break 

100 

101 i -= 1 

102 

103 return " ".join(words[:i]) + "..." 

104 

105 

106class LazyFile: 

107 """A lazy file works like a regular file but it does not fully open 

108 the file but it does perform some basic checks early to see if the 

109 filename parameter does make sense. This is useful for safely opening 

110 files for writing. 

111 """ 

112 

113 def __init__( 

114 self, 

115 filename: t.Union[str, "os.PathLike[str]"], 

116 mode: str = "r", 

117 encoding: t.Optional[str] = None, 

118 errors: t.Optional[str] = "strict", 

119 atomic: bool = False, 

120 ): 

121 self.name: str = os.fspath(filename) 

122 self.mode = mode 

123 self.encoding = encoding 

124 self.errors = errors 

125 self.atomic = atomic 

126 self._f: t.Optional[t.IO[t.Any]] 

127 self.should_close: bool 

128 

129 if self.name == "-": 

130 self._f, self.should_close = open_stream(filename, mode, encoding, errors) 

131 else: 

132 if "r" in mode: 

133 # Open and close the file in case we're opening it for 

134 # reading so that we can catch at least some errors in 

135 # some cases early. 

136 open(filename, mode).close() 

137 self._f = None 

138 self.should_close = True 

139 

140 def __getattr__(self, name: str) -> t.Any: 

141 return getattr(self.open(), name) 

142 

143 def __repr__(self) -> str: 

144 if self._f is not None: 

145 return repr(self._f) 

146 return f"<unopened file '{format_filename(self.name)}' {self.mode}>" 

147 

148 def open(self) -> t.IO[t.Any]: 

149 """Opens the file if it's not yet open. This call might fail with 

150 a :exc:`FileError`. Not handling this error will produce an error 

151 that Click shows. 

152 """ 

153 if self._f is not None: 

154 return self._f 

155 try: 

156 rv, self.should_close = open_stream( 

157 self.name, self.mode, self.encoding, self.errors, atomic=self.atomic 

158 ) 

159 except OSError as e: # noqa: E402 

160 from .exceptions import FileError 

161 

162 raise FileError(self.name, hint=e.strerror) from e 

163 self._f = rv 

164 return rv 

165 

166 def close(self) -> None: 

167 """Closes the underlying file, no matter what.""" 

168 if self._f is not None: 

169 self._f.close() 

170 

171 def close_intelligently(self) -> None: 

172 """This function only closes the file if it was opened by the lazy 

173 file wrapper. For instance this will never close stdin. 

174 """ 

175 if self.should_close: 

176 self.close() 

177 

178 def __enter__(self) -> "LazyFile": 

179 return self 

180 

181 def __exit__( 

182 self, 

183 exc_type: t.Optional[t.Type[BaseException]], 

184 exc_value: t.Optional[BaseException], 

185 tb: t.Optional[TracebackType], 

186 ) -> None: 

187 self.close_intelligently() 

188 

189 def __iter__(self) -> t.Iterator[t.AnyStr]: 

190 self.open() 

191 return iter(self._f) # type: ignore 

192 

193 

194class KeepOpenFile: 

195 def __init__(self, file: t.IO[t.Any]) -> None: 

196 self._file: t.IO[t.Any] = file 

197 

198 def __getattr__(self, name: str) -> t.Any: 

199 return getattr(self._file, name) 

200 

201 def __enter__(self) -> "KeepOpenFile": 

202 return self 

203 

204 def __exit__( 

205 self, 

206 exc_type: t.Optional[t.Type[BaseException]], 

207 exc_value: t.Optional[BaseException], 

208 tb: t.Optional[TracebackType], 

209 ) -> None: 

210 pass 

211 

212 def __repr__(self) -> str: 

213 return repr(self._file) 

214 

215 def __iter__(self) -> t.Iterator[t.AnyStr]: 

216 return iter(self._file) 

217 

218 

219def echo( 

220 message: t.Optional[t.Any] = None, 

221 file: t.Optional[t.IO[t.Any]] = None, 

222 nl: bool = True, 

223 err: bool = False, 

224 color: t.Optional[bool] = None, 

225) -> None: 

226 """Print a message and newline to stdout or a file. This should be 

227 used instead of :func:`print` because it provides better support 

228 for different data, files, and environments. 

229 

230 Compared to :func:`print`, this does the following: 

231 

232 - Ensures that the output encoding is not misconfigured on Linux. 

233 - Supports Unicode in the Windows console. 

234 - Supports writing to binary outputs, and supports writing bytes 

235 to text outputs. 

236 - Supports colors and styles on Windows. 

237 - Removes ANSI color and style codes if the output does not look 

238 like an interactive terminal. 

239 - Always flushes the output. 

240 

241 :param message: The string or bytes to output. Other objects are 

242 converted to strings. 

243 :param file: The file to write to. Defaults to ``stdout``. 

244 :param err: Write to ``stderr`` instead of ``stdout``. 

245 :param nl: Print a newline after the message. Enabled by default. 

246 :param color: Force showing or hiding colors and other styles. By 

247 default Click will remove color if the output does not look like 

248 an interactive terminal. 

249 

250 .. versionchanged:: 6.0 

251 Support Unicode output on the Windows console. Click does not 

252 modify ``sys.stdout``, so ``sys.stdout.write()`` and ``print()`` 

253 will still not support Unicode. 

254 

255 .. versionchanged:: 4.0 

256 Added the ``color`` parameter. 

257 

258 .. versionadded:: 3.0 

259 Added the ``err`` parameter. 

260 

261 .. versionchanged:: 2.0 

262 Support colors on Windows if colorama is installed. 

263 """ 

264 if file is None: 

265 if err: 

266 file = _default_text_stderr() 

267 else: 

268 file = _default_text_stdout() 

269 

270 # There are no standard streams attached to write to. For example, 

271 # pythonw on Windows. 

272 if file is None: 

273 return 

274 

275 # Convert non bytes/text into the native string type. 

276 if message is not None and not isinstance(message, (str, bytes, bytearray)): 

277 out: t.Optional[t.Union[str, bytes]] = str(message) 

278 else: 

279 out = message 

280 

281 if nl: 

282 out = out or "" 

283 if isinstance(out, str): 

284 out += "\n" 

285 else: 

286 out += b"\n" 

287 

288 if not out: 

289 file.flush() 

290 return 

291 

292 # If there is a message and the value looks like bytes, we manually 

293 # need to find the binary stream and write the message in there. 

294 # This is done separately so that most stream types will work as you 

295 # would expect. Eg: you can write to StringIO for other cases. 

296 if isinstance(out, (bytes, bytearray)): 

297 binary_file = _find_binary_writer(file) 

298 

299 if binary_file is not None: 

300 file.flush() 

301 binary_file.write(out) 

302 binary_file.flush() 

303 return 

304 

305 # ANSI style code support. For no message or bytes, nothing happens. 

306 # When outputting to a file instead of a terminal, strip codes. 

307 else: 

308 color = resolve_color_default(color) 

309 

310 if should_strip_ansi(file, color): 

311 out = strip_ansi(out) 

312 elif WIN: 

313 if auto_wrap_for_ansi is not None: 

314 file = auto_wrap_for_ansi(file) # type: ignore 

315 elif not color: 

316 out = strip_ansi(out) 

317 

318 file.write(out) # type: ignore 

319 file.flush() 

320 

321 

322def get_binary_stream(name: "te.Literal['stdin', 'stdout', 'stderr']") -> t.BinaryIO: 

323 """Returns a system stream for byte processing. 

324 

325 :param name: the name of the stream to open. Valid names are ``'stdin'``, 

326 ``'stdout'`` and ``'stderr'`` 

327 """ 

328 opener = binary_streams.get(name) 

329 if opener is None: 

330 raise TypeError(f"Unknown standard stream '{name}'") 

331 return opener() 

332 

333 

334def get_text_stream( 

335 name: "te.Literal['stdin', 'stdout', 'stderr']", 

336 encoding: t.Optional[str] = None, 

337 errors: t.Optional[str] = "strict", 

338) -> t.TextIO: 

339 """Returns a system stream for text processing. This usually returns 

340 a wrapped stream around a binary stream returned from 

341 :func:`get_binary_stream` but it also can take shortcuts for already 

342 correctly configured streams. 

343 

344 :param name: the name of the stream to open. Valid names are ``'stdin'``, 

345 ``'stdout'`` and ``'stderr'`` 

346 :param encoding: overrides the detected default encoding. 

347 :param errors: overrides the default error mode. 

348 """ 

349 opener = text_streams.get(name) 

350 if opener is None: 

351 raise TypeError(f"Unknown standard stream '{name}'") 

352 return opener(encoding, errors) 

353 

354 

355def open_file( 

356 filename: str, 

357 mode: str = "r", 

358 encoding: t.Optional[str] = None, 

359 errors: t.Optional[str] = "strict", 

360 lazy: bool = False, 

361 atomic: bool = False, 

362) -> t.IO[t.Any]: 

363 """Open a file, with extra behavior to handle ``'-'`` to indicate 

364 a standard stream, lazy open on write, and atomic write. Similar to 

365 the behavior of the :class:`~click.File` param type. 

366 

367 If ``'-'`` is given to open ``stdout`` or ``stdin``, the stream is 

368 wrapped so that using it in a context manager will not close it. 

369 This makes it possible to use the function without accidentally 

370 closing a standard stream: 

371 

372 .. code-block:: python 

373 

374 with open_file(filename) as f: 

375 ... 

376 

377 :param filename: The name of the file to open, or ``'-'`` for 

378 ``stdin``/``stdout``. 

379 :param mode: The mode in which to open the file. 

380 :param encoding: The encoding to decode or encode a file opened in 

381 text mode. 

382 :param errors: The error handling mode. 

383 :param lazy: Wait to open the file until it is accessed. For read 

384 mode, the file is temporarily opened to raise access errors 

385 early, then closed until it is read again. 

386 :param atomic: Write to a temporary file and replace the given file 

387 on close. 

388 

389 .. versionadded:: 3.0 

390 """ 

391 if lazy: 

392 return t.cast( 

393 t.IO[t.Any], LazyFile(filename, mode, encoding, errors, atomic=atomic) 

394 ) 

395 

396 f, should_close = open_stream(filename, mode, encoding, errors, atomic=atomic) 

397 

398 if not should_close: 

399 f = t.cast(t.IO[t.Any], KeepOpenFile(f)) 

400 

401 return f 

402 

403 

404def format_filename( 

405 filename: "t.Union[str, bytes, os.PathLike[str], os.PathLike[bytes]]", 

406 shorten: bool = False, 

407) -> str: 

408 """Format a filename as a string for display. Ensures the filename can be 

409 displayed by replacing any invalid bytes or surrogate escapes in the name 

410 with the replacement character ``�``. 

411 

412 Invalid bytes or surrogate escapes will raise an error when written to a 

413 stream with ``errors="strict". This will typically happen with ``stdout`` 

414 when the locale is something like ``en_GB.UTF-8``. 

415 

416 Many scenarios *are* safe to write surrogates though, due to PEP 538 and 

417 PEP 540, including: 

418 

419 - Writing to ``stderr``, which uses ``errors="backslashreplace"``. 

420 - The system has ``LANG=C.UTF-8``, ``C``, or ``POSIX``. Python opens 

421 stdout and stderr with ``errors="surrogateescape"``. 

422 - None of ``LANG/LC_*`` are set. Python assumes ``LANG=C.UTF-8``. 

423 - Python is started in UTF-8 mode with ``PYTHONUTF8=1`` or ``-X utf8``. 

424 Python opens stdout and stderr with ``errors="surrogateescape"``. 

425 

426 :param filename: formats a filename for UI display. This will also convert 

427 the filename into unicode without failing. 

428 :param shorten: this optionally shortens the filename to strip of the 

429 path that leads up to it. 

430 """ 

431 if shorten: 

432 filename = os.path.basename(filename) 

433 else: 

434 filename = os.fspath(filename) 

435 

436 if isinstance(filename, bytes): 

437 filename = filename.decode(sys.getfilesystemencoding(), "replace") 

438 else: 

439 filename = filename.encode("utf-8", "surrogateescape").decode( 

440 "utf-8", "replace" 

441 ) 

442 

443 return filename 

444 

445 

446def get_app_dir(app_name: str, roaming: bool = True, force_posix: bool = False) -> str: 

447 r"""Returns the config folder for the application. The default behavior 

448 is to return whatever is most appropriate for the operating system. 

449 

450 To give you an idea, for an app called ``"Foo Bar"``, something like 

451 the following folders could be returned: 

452 

453 Mac OS X: 

454 ``~/Library/Application Support/Foo Bar`` 

455 Mac OS X (POSIX): 

456 ``~/.foo-bar`` 

457 Unix: 

458 ``~/.config/foo-bar`` 

459 Unix (POSIX): 

460 ``~/.foo-bar`` 

461 Windows (roaming): 

462 ``C:\Users\<user>\AppData\Roaming\Foo Bar`` 

463 Windows (not roaming): 

464 ``C:\Users\<user>\AppData\Local\Foo Bar`` 

465 

466 .. versionadded:: 2.0 

467 

468 :param app_name: the application name. This should be properly capitalized 

469 and can contain whitespace. 

470 :param roaming: controls if the folder should be roaming or not on Windows. 

471 Has no effect otherwise. 

472 :param force_posix: if this is set to `True` then on any POSIX system the 

473 folder will be stored in the home folder with a leading 

474 dot instead of the XDG config home or darwin's 

475 application support folder. 

476 """ 

477 if WIN: 

478 key = "APPDATA" if roaming else "LOCALAPPDATA" 

479 folder = os.environ.get(key) 

480 if folder is None: 

481 folder = os.path.expanduser("~") 

482 return os.path.join(folder, app_name) 

483 if force_posix: 

484 return os.path.join(os.path.expanduser(f"~/.{_posixify(app_name)}")) 

485 if sys.platform == "darwin": 

486 return os.path.join( 

487 os.path.expanduser("~/Library/Application Support"), app_name 

488 ) 

489 return os.path.join( 

490 os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), 

491 _posixify(app_name), 

492 ) 

493 

494 

495class PacifyFlushWrapper: 

496 """This wrapper is used to catch and suppress BrokenPipeErrors resulting 

497 from ``.flush()`` being called on broken pipe during the shutdown/final-GC 

498 of the Python interpreter. Notably ``.flush()`` is always called on 

499 ``sys.stdout`` and ``sys.stderr``. So as to have minimal impact on any 

500 other cleanup code, and the case where the underlying file is not a broken 

501 pipe, all calls and attributes are proxied. 

502 """ 

503 

504 def __init__(self, wrapped: t.IO[t.Any]) -> None: 

505 self.wrapped = wrapped 

506 

507 def flush(self) -> None: 

508 try: 

509 self.wrapped.flush() 

510 except OSError as e: 

511 import errno 

512 

513 if e.errno != errno.EPIPE: 

514 raise 

515 

516 def __getattr__(self, attr: str) -> t.Any: 

517 return getattr(self.wrapped, attr) 

518 

519 

520def _detect_program_name( 

521 path: t.Optional[str] = None, _main: t.Optional[ModuleType] = None 

522) -> str: 

523 """Determine the command used to run the program, for use in help 

524 text. If a file or entry point was executed, the file name is 

525 returned. If ``python -m`` was used to execute a module or package, 

526 ``python -m name`` is returned. 

527 

528 This doesn't try to be too precise, the goal is to give a concise 

529 name for help text. Files are only shown as their name without the 

530 path. ``python`` is only shown for modules, and the full path to 

531 ``sys.executable`` is not shown. 

532 

533 :param path: The Python file being executed. Python puts this in 

534 ``sys.argv[0]``, which is used by default. 

535 :param _main: The ``__main__`` module. This should only be passed 

536 during internal testing. 

537 

538 .. versionadded:: 8.0 

539 Based on command args detection in the Werkzeug reloader. 

540 

541 :meta private: 

542 """ 

543 if _main is None: 

544 _main = sys.modules["__main__"] 

545 

546 if not path: 

547 path = sys.argv[0] 

548 

549 # The value of __package__ indicates how Python was called. It may 

550 # not exist if a setuptools script is installed as an egg. It may be 

551 # set incorrectly for entry points created with pip on Windows. 

552 # It is set to "" inside a Shiv or PEX zipapp. 

553 if getattr(_main, "__package__", None) in {None, ""} or ( 

554 os.name == "nt" 

555 and _main.__package__ == "" 

556 and not os.path.exists(path) 

557 and os.path.exists(f"{path}.exe") 

558 ): 

559 # Executed a file, like "python app.py". 

560 return os.path.basename(path) 

561 

562 # Executed a module, like "python -m example". 

563 # Rewritten by Python from "-m script" to "/path/to/script.py". 

564 # Need to look at main module to determine how it was executed. 

565 py_module = t.cast(str, _main.__package__) 

566 name = os.path.splitext(os.path.basename(path))[0] 

567 

568 # A submodule like "example.cli". 

569 if name != "__main__": 

570 py_module = f"{py_module}.{name}" 

571 

572 return f"python -m {py_module.lstrip('.')}" 

573 

574 

575def _expand_args( 

576 args: t.Iterable[str], 

577 *, 

578 user: bool = True, 

579 env: bool = True, 

580 glob_recursive: bool = True, 

581) -> t.List[str]: 

582 """Simulate Unix shell expansion with Python functions. 

583 

584 See :func:`glob.glob`, :func:`os.path.expanduser`, and 

585 :func:`os.path.expandvars`. 

586 

587 This is intended for use on Windows, where the shell does not do any 

588 expansion. It may not exactly match what a Unix shell would do. 

589 

590 :param args: List of command line arguments to expand. 

591 :param user: Expand user home directory. 

592 :param env: Expand environment variables. 

593 :param glob_recursive: ``**`` matches directories recursively. 

594 

595 .. versionchanged:: 8.1 

596 Invalid glob patterns are treated as empty expansions rather 

597 than raising an error. 

598 

599 .. versionadded:: 8.0 

600 

601 :meta private: 

602 """ 

603 from glob import glob 

604 

605 out = [] 

606 

607 for arg in args: 

608 if user: 

609 arg = os.path.expanduser(arg) 

610 

611 if env: 

612 arg = os.path.expandvars(arg) 

613 

614 try: 

615 matches = glob(arg, recursive=glob_recursive) 

616 except re.error: 

617 matches = [] 

618 

619 if not matches: 

620 out.append(arg) 

621 else: 

622 out.extend(matches) 

623 

624 return out