Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/click/utils.py: 23%

228 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:07 +0000

1import os 

2import re 

3import sys 

4import typing as t 

5from functools import update_wrapper 

6from types import ModuleType 

7 

8from ._compat import _default_text_stderr 

9from ._compat import _default_text_stdout 

10from ._compat import _find_binary_writer 

11from ._compat import auto_wrap_for_ansi 

12from ._compat import binary_streams 

13from ._compat import get_filesystem_encoding 

14from ._compat import open_stream 

15from ._compat import should_strip_ansi 

16from ._compat import strip_ansi 

17from ._compat import text_streams 

18from ._compat import WIN 

19from .globals import resolve_color_default 

20 

21if t.TYPE_CHECKING: 

22 import typing_extensions as te 

23 

24 P = te.ParamSpec("P") 

25 

26R = t.TypeVar("R") 

27 

28 

29def _posixify(name: str) -> str: 

30 return "-".join(name.split()).lower() 

31 

32 

33def safecall(func: "t.Callable[P, R]") -> "t.Callable[P, t.Optional[R]]": 

34 """Wraps a function so that it swallows exceptions.""" 

35 

36 def wrapper(*args: "P.args", **kwargs: "P.kwargs") -> t.Optional[R]: 

37 try: 

38 return func(*args, **kwargs) 

39 except Exception: 

40 pass 

41 return None 

42 

43 return update_wrapper(wrapper, func) 

44 

45 

46def make_str(value: t.Any) -> str: 

47 """Converts a value into a valid string.""" 

48 if isinstance(value, bytes): 

49 try: 

50 return value.decode(get_filesystem_encoding()) 

51 except UnicodeError: 

52 return value.decode("utf-8", "replace") 

53 return str(value) 

54 

55 

56def make_default_short_help(help: str, max_length: int = 45) -> str: 

57 """Returns a condensed version of help string.""" 

58 # Consider only the first paragraph. 

59 paragraph_end = help.find("\n\n") 

60 

61 if paragraph_end != -1: 

62 help = help[:paragraph_end] 

63 

64 # Collapse newlines, tabs, and spaces. 

65 words = help.split() 

66 

67 if not words: 

68 return "" 

69 

70 # The first paragraph started with a "no rewrap" marker, ignore it. 

71 if words[0] == "\b": 

72 words = words[1:] 

73 

74 total_length = 0 

75 last_index = len(words) - 1 

76 

77 for i, word in enumerate(words): 

78 total_length += len(word) + (i > 0) 

79 

80 if total_length > max_length: # too long, truncate 

81 break 

82 

83 if word[-1] == ".": # sentence end, truncate without "..." 

84 return " ".join(words[: i + 1]) 

85 

86 if total_length == max_length and i != last_index: 

87 break # not at sentence end, truncate with "..." 

88 else: 

89 return " ".join(words) # no truncation needed 

90 

91 # Account for the length of the suffix. 

92 total_length += len("...") 

93 

94 # remove words until the length is short enough 

95 while i > 0: 

96 total_length -= len(words[i]) + (i > 0) 

97 

98 if total_length <= max_length: 

99 break 

100 

101 i -= 1 

102 

103 return " ".join(words[:i]) + "..." 

104 

105 

106class LazyFile: 

107 """A lazy file works like a regular file but it does not fully open 

108 the file but it does perform some basic checks early to see if the 

109 filename parameter does make sense. This is useful for safely opening 

110 files for writing. 

111 """ 

112 

113 def __init__( 

114 self, 

115 filename: str, 

116 mode: str = "r", 

117 encoding: t.Optional[str] = None, 

118 errors: t.Optional[str] = "strict", 

119 atomic: bool = False, 

120 ): 

121 self.name = filename 

122 self.mode = mode 

123 self.encoding = encoding 

124 self.errors = errors 

125 self.atomic = atomic 

126 self._f: t.Optional[t.IO[t.Any]] 

127 

128 if filename == "-": 

129 self._f, self.should_close = open_stream(filename, mode, encoding, errors) 

130 else: 

131 if "r" in mode: 

132 # Open and close the file in case we're opening it for 

133 # reading so that we can catch at least some errors in 

134 # some cases early. 

135 open(filename, mode).close() 

136 self._f = None 

137 self.should_close = True 

138 

139 def __getattr__(self, name: str) -> t.Any: 

140 return getattr(self.open(), name) 

141 

142 def __repr__(self) -> str: 

143 if self._f is not None: 

144 return repr(self._f) 

145 return f"<unopened file '{self.name}' {self.mode}>" 

146 

147 def open(self) -> t.IO[t.Any]: 

148 """Opens the file if it's not yet open. This call might fail with 

149 a :exc:`FileError`. Not handling this error will produce an error 

150 that Click shows. 

151 """ 

152 if self._f is not None: 

153 return self._f 

154 try: 

155 rv, self.should_close = open_stream( 

156 self.name, self.mode, self.encoding, self.errors, atomic=self.atomic 

157 ) 

158 except OSError as e: # noqa: E402 

159 from .exceptions import FileError 

160 

161 raise FileError(self.name, hint=e.strerror) from e 

162 self._f = rv 

163 return rv 

164 

165 def close(self) -> None: 

166 """Closes the underlying file, no matter what.""" 

167 if self._f is not None: 

168 self._f.close() 

169 

170 def close_intelligently(self) -> None: 

171 """This function only closes the file if it was opened by the lazy 

172 file wrapper. For instance this will never close stdin. 

173 """ 

174 if self.should_close: 

175 self.close() 

176 

177 def __enter__(self) -> "LazyFile": 

178 return self 

179 

180 def __exit__(self, *_: t.Any) -> None: 

181 self.close_intelligently() 

182 

183 def __iter__(self) -> t.Iterator[t.AnyStr]: 

184 self.open() 

185 return iter(self._f) # type: ignore 

186 

187 

188class KeepOpenFile: 

189 def __init__(self, file: t.IO[t.Any]) -> None: 

190 self._file = file 

191 

192 def __getattr__(self, name: str) -> t.Any: 

193 return getattr(self._file, name) 

194 

195 def __enter__(self) -> "KeepOpenFile": 

196 return self 

197 

198 def __exit__(self, *_: t.Any) -> None: 

199 pass 

200 

201 def __repr__(self) -> str: 

202 return repr(self._file) 

203 

204 def __iter__(self) -> t.Iterator[t.AnyStr]: 

205 return iter(self._file) 

206 

207 

208def echo( 

209 message: t.Optional[t.Any] = None, 

210 file: t.Optional[t.IO[t.Any]] = None, 

211 nl: bool = True, 

212 err: bool = False, 

213 color: t.Optional[bool] = None, 

214) -> None: 

215 """Print a message and newline to stdout or a file. This should be 

216 used instead of :func:`print` because it provides better support 

217 for different data, files, and environments. 

218 

219 Compared to :func:`print`, this does the following: 

220 

221 - Ensures that the output encoding is not misconfigured on Linux. 

222 - Supports Unicode in the Windows console. 

223 - Supports writing to binary outputs, and supports writing bytes 

224 to text outputs. 

225 - Supports colors and styles on Windows. 

226 - Removes ANSI color and style codes if the output does not look 

227 like an interactive terminal. 

228 - Always flushes the output. 

229 

230 :param message: The string or bytes to output. Other objects are 

231 converted to strings. 

232 :param file: The file to write to. Defaults to ``stdout``. 

233 :param err: Write to ``stderr`` instead of ``stdout``. 

234 :param nl: Print a newline after the message. Enabled by default. 

235 :param color: Force showing or hiding colors and other styles. By 

236 default Click will remove color if the output does not look like 

237 an interactive terminal. 

238 

239 .. versionchanged:: 6.0 

240 Support Unicode output on the Windows console. Click does not 

241 modify ``sys.stdout``, so ``sys.stdout.write()`` and ``print()`` 

242 will still not support Unicode. 

243 

244 .. versionchanged:: 4.0 

245 Added the ``color`` parameter. 

246 

247 .. versionadded:: 3.0 

248 Added the ``err`` parameter. 

249 

250 .. versionchanged:: 2.0 

251 Support colors on Windows if colorama is installed. 

252 """ 

253 if file is None: 

254 if err: 

255 file = _default_text_stderr() 

256 else: 

257 file = _default_text_stdout() 

258 

259 # Convert non bytes/text into the native string type. 

260 if message is not None and not isinstance(message, (str, bytes, bytearray)): 

261 out: t.Optional[t.Union[str, bytes]] = str(message) 

262 else: 

263 out = message 

264 

265 if nl: 

266 out = out or "" 

267 if isinstance(out, str): 

268 out += "\n" 

269 else: 

270 out += b"\n" 

271 

272 if not out: 

273 file.flush() 

274 return 

275 

276 # If there is a message and the value looks like bytes, we manually 

277 # need to find the binary stream and write the message in there. 

278 # This is done separately so that most stream types will work as you 

279 # would expect. Eg: you can write to StringIO for other cases. 

280 if isinstance(out, (bytes, bytearray)): 

281 binary_file = _find_binary_writer(file) 

282 

283 if binary_file is not None: 

284 file.flush() 

285 binary_file.write(out) 

286 binary_file.flush() 

287 return 

288 

289 # ANSI style code support. For no message or bytes, nothing happens. 

290 # When outputting to a file instead of a terminal, strip codes. 

291 else: 

292 color = resolve_color_default(color) 

293 

294 if should_strip_ansi(file, color): 

295 out = strip_ansi(out) 

296 elif WIN: 

297 if auto_wrap_for_ansi is not None: 

298 file = auto_wrap_for_ansi(file) # type: ignore 

299 elif not color: 

300 out = strip_ansi(out) 

301 

302 file.write(out) # type: ignore 

303 file.flush() 

304 

305 

306def get_binary_stream(name: "te.Literal['stdin', 'stdout', 'stderr']") -> t.BinaryIO: 

307 """Returns a system stream for byte processing. 

308 

309 :param name: the name of the stream to open. Valid names are ``'stdin'``, 

310 ``'stdout'`` and ``'stderr'`` 

311 """ 

312 opener = binary_streams.get(name) 

313 if opener is None: 

314 raise TypeError(f"Unknown standard stream '{name}'") 

315 return opener() 

316 

317 

318def get_text_stream( 

319 name: "te.Literal['stdin', 'stdout', 'stderr']", 

320 encoding: t.Optional[str] = None, 

321 errors: t.Optional[str] = "strict", 

322) -> t.TextIO: 

323 """Returns a system stream for text processing. This usually returns 

324 a wrapped stream around a binary stream returned from 

325 :func:`get_binary_stream` but it also can take shortcuts for already 

326 correctly configured streams. 

327 

328 :param name: the name of the stream to open. Valid names are ``'stdin'``, 

329 ``'stdout'`` and ``'stderr'`` 

330 :param encoding: overrides the detected default encoding. 

331 :param errors: overrides the default error mode. 

332 """ 

333 opener = text_streams.get(name) 

334 if opener is None: 

335 raise TypeError(f"Unknown standard stream '{name}'") 

336 return opener(encoding, errors) 

337 

338 

339def open_file( 

340 filename: str, 

341 mode: str = "r", 

342 encoding: t.Optional[str] = None, 

343 errors: t.Optional[str] = "strict", 

344 lazy: bool = False, 

345 atomic: bool = False, 

346) -> t.IO[t.Any]: 

347 """Open a file, with extra behavior to handle ``'-'`` to indicate 

348 a standard stream, lazy open on write, and atomic write. Similar to 

349 the behavior of the :class:`~click.File` param type. 

350 

351 If ``'-'`` is given to open ``stdout`` or ``stdin``, the stream is 

352 wrapped so that using it in a context manager will not close it. 

353 This makes it possible to use the function without accidentally 

354 closing a standard stream: 

355 

356 .. code-block:: python 

357 

358 with open_file(filename) as f: 

359 ... 

360 

361 :param filename: The name of the file to open, or ``'-'`` for 

362 ``stdin``/``stdout``. 

363 :param mode: The mode in which to open the file. 

364 :param encoding: The encoding to decode or encode a file opened in 

365 text mode. 

366 :param errors: The error handling mode. 

367 :param lazy: Wait to open the file until it is accessed. For read 

368 mode, the file is temporarily opened to raise access errors 

369 early, then closed until it is read again. 

370 :param atomic: Write to a temporary file and replace the given file 

371 on close. 

372 

373 .. versionadded:: 3.0 

374 """ 

375 if lazy: 

376 return t.cast( 

377 t.IO[t.Any], LazyFile(filename, mode, encoding, errors, atomic=atomic) 

378 ) 

379 

380 f, should_close = open_stream(filename, mode, encoding, errors, atomic=atomic) 

381 

382 if not should_close: 

383 f = t.cast(t.IO[t.Any], KeepOpenFile(f)) 

384 

385 return f 

386 

387 

388def format_filename( 

389 filename: t.Union[str, bytes, "os.PathLike[t.AnyStr]"], shorten: bool = False 

390) -> str: 

391 """Formats a filename for user display. The main purpose of this 

392 function is to ensure that the filename can be displayed at all. This 

393 will decode the filename to unicode if necessary in a way that it will 

394 not fail. Optionally, it can shorten the filename to not include the 

395 full path to the filename. 

396 

397 :param filename: formats a filename for UI display. This will also convert 

398 the filename into unicode without failing. 

399 :param shorten: this optionally shortens the filename to strip of the 

400 path that leads up to it. 

401 """ 

402 if shorten: 

403 filename = os.path.basename(filename) 

404 

405 return os.fsdecode(filename) 

406 

407 

408def get_app_dir(app_name: str, roaming: bool = True, force_posix: bool = False) -> str: 

409 r"""Returns the config folder for the application. The default behavior 

410 is to return whatever is most appropriate for the operating system. 

411 

412 To give you an idea, for an app called ``"Foo Bar"``, something like 

413 the following folders could be returned: 

414 

415 Mac OS X: 

416 ``~/Library/Application Support/Foo Bar`` 

417 Mac OS X (POSIX): 

418 ``~/.foo-bar`` 

419 Unix: 

420 ``~/.config/foo-bar`` 

421 Unix (POSIX): 

422 ``~/.foo-bar`` 

423 Windows (roaming): 

424 ``C:\Users\<user>\AppData\Roaming\Foo Bar`` 

425 Windows (not roaming): 

426 ``C:\Users\<user>\AppData\Local\Foo Bar`` 

427 

428 .. versionadded:: 2.0 

429 

430 :param app_name: the application name. This should be properly capitalized 

431 and can contain whitespace. 

432 :param roaming: controls if the folder should be roaming or not on Windows. 

433 Has no effect otherwise. 

434 :param force_posix: if this is set to `True` then on any POSIX system the 

435 folder will be stored in the home folder with a leading 

436 dot instead of the XDG config home or darwin's 

437 application support folder. 

438 """ 

439 if WIN: 

440 key = "APPDATA" if roaming else "LOCALAPPDATA" 

441 folder = os.environ.get(key) 

442 if folder is None: 

443 folder = os.path.expanduser("~") 

444 return os.path.join(folder, app_name) 

445 if force_posix: 

446 return os.path.join(os.path.expanduser(f"~/.{_posixify(app_name)}")) 

447 if sys.platform == "darwin": 

448 return os.path.join( 

449 os.path.expanduser("~/Library/Application Support"), app_name 

450 ) 

451 return os.path.join( 

452 os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), 

453 _posixify(app_name), 

454 ) 

455 

456 

457class PacifyFlushWrapper: 

458 """This wrapper is used to catch and suppress BrokenPipeErrors resulting 

459 from ``.flush()`` being called on broken pipe during the shutdown/final-GC 

460 of the Python interpreter. Notably ``.flush()`` is always called on 

461 ``sys.stdout`` and ``sys.stderr``. So as to have minimal impact on any 

462 other cleanup code, and the case where the underlying file is not a broken 

463 pipe, all calls and attributes are proxied. 

464 """ 

465 

466 def __init__(self, wrapped: t.IO[t.Any]) -> None: 

467 self.wrapped = wrapped 

468 

469 def flush(self) -> None: 

470 try: 

471 self.wrapped.flush() 

472 except OSError as e: 

473 import errno 

474 

475 if e.errno != errno.EPIPE: 

476 raise 

477 

478 def __getattr__(self, attr: str) -> t.Any: 

479 return getattr(self.wrapped, attr) 

480 

481 

482def _detect_program_name( 

483 path: t.Optional[str] = None, _main: t.Optional[ModuleType] = None 

484) -> str: 

485 """Determine the command used to run the program, for use in help 

486 text. If a file or entry point was executed, the file name is 

487 returned. If ``python -m`` was used to execute a module or package, 

488 ``python -m name`` is returned. 

489 

490 This doesn't try to be too precise, the goal is to give a concise 

491 name for help text. Files are only shown as their name without the 

492 path. ``python`` is only shown for modules, and the full path to 

493 ``sys.executable`` is not shown. 

494 

495 :param path: The Python file being executed. Python puts this in 

496 ``sys.argv[0]``, which is used by default. 

497 :param _main: The ``__main__`` module. This should only be passed 

498 during internal testing. 

499 

500 .. versionadded:: 8.0 

501 Based on command args detection in the Werkzeug reloader. 

502 

503 :meta private: 

504 """ 

505 if _main is None: 

506 _main = sys.modules["__main__"] 

507 

508 if not path: 

509 path = sys.argv[0] 

510 

511 # The value of __package__ indicates how Python was called. It may 

512 # not exist if a setuptools script is installed as an egg. It may be 

513 # set incorrectly for entry points created with pip on Windows. 

514 if getattr(_main, "__package__", None) is None or ( 

515 os.name == "nt" 

516 and _main.__package__ == "" 

517 and not os.path.exists(path) 

518 and os.path.exists(f"{path}.exe") 

519 ): 

520 # Executed a file, like "python app.py". 

521 return os.path.basename(path) 

522 

523 # Executed a module, like "python -m example". 

524 # Rewritten by Python from "-m script" to "/path/to/script.py". 

525 # Need to look at main module to determine how it was executed. 

526 py_module = t.cast(str, _main.__package__) 

527 name = os.path.splitext(os.path.basename(path))[0] 

528 

529 # A submodule like "example.cli". 

530 if name != "__main__": 

531 py_module = f"{py_module}.{name}" 

532 

533 return f"python -m {py_module.lstrip('.')}" 

534 

535 

536def _expand_args( 

537 args: t.Iterable[str], 

538 *, 

539 user: bool = True, 

540 env: bool = True, 

541 glob_recursive: bool = True, 

542) -> t.List[str]: 

543 """Simulate Unix shell expansion with Python functions. 

544 

545 See :func:`glob.glob`, :func:`os.path.expanduser`, and 

546 :func:`os.path.expandvars`. 

547 

548 This is intended for use on Windows, where the shell does not do any 

549 expansion. It may not exactly match what a Unix shell would do. 

550 

551 :param args: List of command line arguments to expand. 

552 :param user: Expand user home directory. 

553 :param env: Expand environment variables. 

554 :param glob_recursive: ``**`` matches directories recursively. 

555 

556 .. versionchanged:: 8.1 

557 Invalid glob patterns are treated as empty expansions rather 

558 than raising an error. 

559 

560 .. versionadded:: 8.0 

561 

562 :meta private: 

563 """ 

564 from glob import glob 

565 

566 out = [] 

567 

568 for arg in args: 

569 if user: 

570 arg = os.path.expanduser(arg) 

571 

572 if env: 

573 arg = os.path.expandvars(arg) 

574 

575 try: 

576 matches = glob(arg, recursive=glob_recursive) 

577 except re.error: 

578 matches = [] 

579 

580 if not matches: 

581 out.append(arg) 

582 else: 

583 out.extend(matches) 

584 

585 return out