Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/psutil-7.2.0-py3.11-linux-x86_64.egg/psutil/_common.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

386 statements  

1# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. 

2# Use of this source code is governed by a BSD-style license that can be 

3# found in the LICENSE file. 

4 

5"""Common objects shared by __init__.py and _ps*.py modules. 

6 

7Note: this module is imported by setup.py, so it should not import 

8psutil or third-party modules. 

9""" 

10 

11import collections 

12import enum 

13import functools 

14import os 

15import socket 

16import stat 

17import sys 

18import threading 

19import warnings 

20from socket import AF_INET 

21from socket import SOCK_DGRAM 

22from socket import SOCK_STREAM 

23 

24try: 

25 from socket import AF_INET6 

26except ImportError: 

27 AF_INET6 = None 

28try: 

29 from socket import AF_UNIX 

30except ImportError: 

31 AF_UNIX = None 

32 

33 

34PSUTIL_DEBUG = bool(os.getenv('PSUTIL_DEBUG')) 

35_DEFAULT = object() 

36 

37# fmt: off 

38__all__ = [ 

39 # OS constants 

40 'FREEBSD', 'BSD', 'LINUX', 'NETBSD', 'OPENBSD', 'MACOS', 'OSX', 'POSIX', 

41 'SUNOS', 'WINDOWS', 

42 # connection constants 

43 'CONN_CLOSE', 'CONN_CLOSE_WAIT', 'CONN_CLOSING', 'CONN_ESTABLISHED', 

44 'CONN_FIN_WAIT1', 'CONN_FIN_WAIT2', 'CONN_LAST_ACK', 'CONN_LISTEN', 

45 'CONN_NONE', 'CONN_SYN_RECV', 'CONN_SYN_SENT', 'CONN_TIME_WAIT', 

46 # net constants 

47 'NIC_DUPLEX_FULL', 'NIC_DUPLEX_HALF', 'NIC_DUPLEX_UNKNOWN', # noqa: F822 

48 # process status constants 

49 'STATUS_DEAD', 'STATUS_DISK_SLEEP', 'STATUS_IDLE', 'STATUS_LOCKED', 

50 'STATUS_RUNNING', 'STATUS_SLEEPING', 'STATUS_STOPPED', 'STATUS_SUSPENDED', 

51 'STATUS_TRACING_STOP', 'STATUS_WAITING', 'STATUS_WAKE_KILL', 

52 'STATUS_WAKING', 'STATUS_ZOMBIE', 'STATUS_PARKED', 

53 # other constants 

54 'ENCODING', 'ENCODING_ERRS', 'AF_INET6', 

55 # utility functions 

56 'conn_tmap', 'deprecated_method', 'isfile_strict', 'memoize', 

57 'parse_environ_block', 'path_exists_strict', 'usage_percent', 

58 'supports_ipv6', 'sockfam_to_enum', 'socktype_to_enum', "wrap_numbers", 

59 'open_text', 'open_binary', 'cat', 'bcat', 

60 'bytes2human', 'conn_to_ntuple', 'debug', 

61 # shell utils 

62 'hilite', 'term_supports_colors', 'print_color', 

63] 

64# fmt: on 

65 

66 

67# =================================================================== 

68# --- OS constants 

69# =================================================================== 

70 

71 

72POSIX = os.name == "posix" 

73WINDOWS = os.name == "nt" 

74LINUX = sys.platform.startswith("linux") 

75MACOS = sys.platform.startswith("darwin") 

76OSX = MACOS # deprecated alias 

77FREEBSD = sys.platform.startswith(("freebsd", "midnightbsd")) 

78OPENBSD = sys.platform.startswith("openbsd") 

79NETBSD = sys.platform.startswith("netbsd") 

80BSD = FREEBSD or OPENBSD or NETBSD 

81SUNOS = sys.platform.startswith(("sunos", "solaris")) 

82AIX = sys.platform.startswith("aix") 

83 

84 

85# =================================================================== 

86# --- API constants 

87# =================================================================== 

88 

89 

90# Process.status() 

91STATUS_RUNNING = "running" 

92STATUS_SLEEPING = "sleeping" 

93STATUS_DISK_SLEEP = "disk-sleep" 

94STATUS_STOPPED = "stopped" 

95STATUS_TRACING_STOP = "tracing-stop" 

96STATUS_ZOMBIE = "zombie" 

97STATUS_DEAD = "dead" 

98STATUS_WAKE_KILL = "wake-kill" 

99STATUS_WAKING = "waking" 

100STATUS_IDLE = "idle" # Linux, macOS, FreeBSD 

101STATUS_LOCKED = "locked" # FreeBSD 

102STATUS_WAITING = "waiting" # FreeBSD 

103STATUS_SUSPENDED = "suspended" # NetBSD 

104STATUS_PARKED = "parked" # Linux 

105 

106# Process.net_connections() and psutil.net_connections() 

107CONN_ESTABLISHED = "ESTABLISHED" 

108CONN_SYN_SENT = "SYN_SENT" 

109CONN_SYN_RECV = "SYN_RECV" 

110CONN_FIN_WAIT1 = "FIN_WAIT1" 

111CONN_FIN_WAIT2 = "FIN_WAIT2" 

112CONN_TIME_WAIT = "TIME_WAIT" 

113CONN_CLOSE = "CLOSE" 

114CONN_CLOSE_WAIT = "CLOSE_WAIT" 

115CONN_LAST_ACK = "LAST_ACK" 

116CONN_LISTEN = "LISTEN" 

117CONN_CLOSING = "CLOSING" 

118CONN_NONE = "NONE" 

119 

120 

121# net_if_stats() 

122class NicDuplex(enum.IntEnum): 

123 NIC_DUPLEX_FULL = 2 

124 NIC_DUPLEX_HALF = 1 

125 NIC_DUPLEX_UNKNOWN = 0 

126 

127 

128globals().update(NicDuplex.__members__) 

129 

130 

131# sensors_battery() 

132class BatteryTime(enum.IntEnum): 

133 POWER_TIME_UNKNOWN = -1 

134 POWER_TIME_UNLIMITED = -2 

135 

136 

137globals().update(BatteryTime.__members__) 

138 

139# --- others 

140 

141ENCODING = sys.getfilesystemencoding() 

142ENCODING_ERRS = sys.getfilesystemencodeerrors() 

143 

144 

145# =================================================================== 

146# --- Process.net_connections() 'kind' parameter mapping 

147# =================================================================== 

148 

149 

150conn_tmap = { 

151 "all": ([AF_INET, AF_INET6, AF_UNIX], [SOCK_STREAM, SOCK_DGRAM]), 

152 "tcp": ([AF_INET, AF_INET6], [SOCK_STREAM]), 

153 "tcp4": ([AF_INET], [SOCK_STREAM]), 

154 "udp": ([AF_INET, AF_INET6], [SOCK_DGRAM]), 

155 "udp4": ([AF_INET], [SOCK_DGRAM]), 

156 "inet": ([AF_INET, AF_INET6], [SOCK_STREAM, SOCK_DGRAM]), 

157 "inet4": ([AF_INET], [SOCK_STREAM, SOCK_DGRAM]), 

158 "inet6": ([AF_INET6], [SOCK_STREAM, SOCK_DGRAM]), 

159} 

160 

161if AF_INET6 is not None: 

162 conn_tmap.update({ 

163 "tcp6": ([AF_INET6], [SOCK_STREAM]), 

164 "udp6": ([AF_INET6], [SOCK_DGRAM]), 

165 }) 

166 

167if AF_UNIX is not None and not SUNOS: 

168 conn_tmap.update({"unix": ([AF_UNIX], [SOCK_STREAM, SOCK_DGRAM])}) 

169 

170 

171# ===================================================================== 

172# --- Exceptions 

173# ===================================================================== 

174 

175 

176class Error(Exception): 

177 """Base exception class. All other psutil exceptions inherit 

178 from this one. 

179 """ 

180 

181 __module__ = 'psutil' 

182 

183 def _infodict(self, attrs): 

184 info = collections.OrderedDict() 

185 for name in attrs: 

186 value = getattr(self, name, None) 

187 if value or (name == "pid" and value == 0): 

188 info[name] = value 

189 return info 

190 

191 def __str__(self): 

192 # invoked on `raise Error` 

193 info = self._infodict(("pid", "ppid", "name")) 

194 if info: 

195 details = "({})".format( 

196 ", ".join([f"{k}={v!r}" for k, v in info.items()]) 

197 ) 

198 else: 

199 details = None 

200 return " ".join([x for x in (getattr(self, "msg", ""), details) if x]) 

201 

202 def __repr__(self): 

203 # invoked on `repr(Error)` 

204 info = self._infodict(("pid", "ppid", "name", "seconds", "msg")) 

205 details = ", ".join([f"{k}={v!r}" for k, v in info.items()]) 

206 return f"psutil.{self.__class__.__name__}({details})" 

207 

208 

209class NoSuchProcess(Error): 

210 """Exception raised when a process with a certain PID doesn't 

211 or no longer exists. 

212 """ 

213 

214 __module__ = 'psutil' 

215 

216 def __init__(self, pid, name=None, msg=None): 

217 Error.__init__(self) 

218 self.pid = pid 

219 self.name = name 

220 self.msg = msg or "process no longer exists" 

221 

222 def __reduce__(self): 

223 return (self.__class__, (self.pid, self.name, self.msg)) 

224 

225 

226class ZombieProcess(NoSuchProcess): 

227 """Exception raised when querying a zombie process. This is 

228 raised on macOS, BSD and Solaris only, and not always: depending 

229 on the query the OS may be able to succeed anyway. 

230 On Linux all zombie processes are querable (hence this is never 

231 raised). Windows doesn't have zombie processes. 

232 """ 

233 

234 __module__ = 'psutil' 

235 

236 def __init__(self, pid, name=None, ppid=None, msg=None): 

237 NoSuchProcess.__init__(self, pid, name, msg) 

238 self.ppid = ppid 

239 self.msg = msg or "PID still exists but it's a zombie" 

240 

241 def __reduce__(self): 

242 return (self.__class__, (self.pid, self.name, self.ppid, self.msg)) 

243 

244 

245class AccessDenied(Error): 

246 """Exception raised when permission to perform an action is denied.""" 

247 

248 __module__ = 'psutil' 

249 

250 def __init__(self, pid=None, name=None, msg=None): 

251 Error.__init__(self) 

252 self.pid = pid 

253 self.name = name 

254 self.msg = msg or "" 

255 

256 def __reduce__(self): 

257 return (self.__class__, (self.pid, self.name, self.msg)) 

258 

259 

260class TimeoutExpired(Error): 

261 """Raised on Process.wait(timeout) if timeout expires and process 

262 is still alive. 

263 """ 

264 

265 __module__ = 'psutil' 

266 

267 def __init__(self, seconds, pid=None, name=None): 

268 Error.__init__(self) 

269 self.seconds = seconds 

270 self.pid = pid 

271 self.name = name 

272 self.msg = f"timeout after {seconds} seconds" 

273 

274 def __reduce__(self): 

275 return (self.__class__, (self.seconds, self.pid, self.name)) 

276 

277 

278# =================================================================== 

279# --- utils 

280# =================================================================== 

281 

282 

283def usage_percent(used, total, round_=None): 

284 """Calculate percentage usage of 'used' against 'total'.""" 

285 try: 

286 ret = (float(used) / total) * 100 

287 except ZeroDivisionError: 

288 return 0.0 

289 else: 

290 if round_ is not None: 

291 ret = round(ret, round_) 

292 return ret 

293 

294 

295def memoize(fun): 

296 """A simple memoize decorator for functions supporting (hashable) 

297 positional arguments. 

298 It also provides a cache_clear() function for clearing the cache: 

299 

300 >>> @memoize 

301 ... def foo() 

302 ... return 1 

303 ... 

304 >>> foo() 

305 1 

306 >>> foo.cache_clear() 

307 >>> 

308 

309 It supports: 

310 - functions 

311 - classes (acts as a @singleton) 

312 - staticmethods 

313 - classmethods 

314 

315 It does NOT support: 

316 - methods 

317 """ 

318 

319 @functools.wraps(fun) 

320 def wrapper(*args, **kwargs): 

321 key = (args, frozenset(sorted(kwargs.items()))) 

322 try: 

323 return cache[key] 

324 except KeyError: 

325 try: 

326 ret = cache[key] = fun(*args, **kwargs) 

327 except Exception as err: 

328 raise err from None 

329 return ret 

330 

331 def cache_clear(): 

332 """Clear cache.""" 

333 cache.clear() 

334 

335 cache = {} 

336 wrapper.cache_clear = cache_clear 

337 return wrapper 

338 

339 

340def memoize_when_activated(fun): 

341 """A memoize decorator which is disabled by default. It can be 

342 activated and deactivated on request. 

343 For efficiency reasons it can be used only against class methods 

344 accepting no arguments. 

345 

346 >>> class Foo: 

347 ... @memoize 

348 ... def foo() 

349 ... print(1) 

350 ... 

351 >>> f = Foo() 

352 >>> # deactivated (default) 

353 >>> foo() 

354 1 

355 >>> foo() 

356 1 

357 >>> 

358 >>> # activated 

359 >>> foo.cache_activate(self) 

360 >>> foo() 

361 1 

362 >>> foo() 

363 >>> foo() 

364 >>> 

365 """ 

366 

367 @functools.wraps(fun) 

368 def wrapper(self): 

369 try: 

370 # case 1: we previously entered oneshot() ctx 

371 ret = self._cache[fun] 

372 except AttributeError: 

373 # case 2: we never entered oneshot() ctx 

374 try: 

375 return fun(self) 

376 except Exception as err: 

377 raise err from None 

378 except KeyError: 

379 # case 3: we entered oneshot() ctx but there's no cache 

380 # for this entry yet 

381 try: 

382 ret = fun(self) 

383 except Exception as err: 

384 raise err from None 

385 try: 

386 self._cache[fun] = ret 

387 except AttributeError: 

388 # multi-threading race condition, see: 

389 # https://github.com/giampaolo/psutil/issues/1948 

390 pass 

391 return ret 

392 

393 def cache_activate(proc): 

394 """Activate cache. Expects a Process instance. Cache will be 

395 stored as a "_cache" instance attribute. 

396 """ 

397 proc._cache = {} 

398 

399 def cache_deactivate(proc): 

400 """Deactivate and clear cache.""" 

401 try: 

402 del proc._cache 

403 except AttributeError: 

404 pass 

405 

406 wrapper.cache_activate = cache_activate 

407 wrapper.cache_deactivate = cache_deactivate 

408 return wrapper 

409 

410 

411def isfile_strict(path): 

412 """Same as os.path.isfile() but does not swallow EACCES / EPERM 

413 exceptions, see: 

414 http://mail.python.org/pipermail/python-dev/2012-June/120787.html. 

415 """ 

416 try: 

417 st = os.stat(path) 

418 except PermissionError: 

419 raise 

420 except OSError: 

421 return False 

422 else: 

423 return stat.S_ISREG(st.st_mode) 

424 

425 

426def path_exists_strict(path): 

427 """Same as os.path.exists() but does not swallow EACCES / EPERM 

428 exceptions. See: 

429 http://mail.python.org/pipermail/python-dev/2012-June/120787.html. 

430 """ 

431 try: 

432 os.stat(path) 

433 except PermissionError: 

434 raise 

435 except OSError: 

436 return False 

437 else: 

438 return True 

439 

440 

441def supports_ipv6(): 

442 """Return True if IPv6 is supported on this platform.""" 

443 if not socket.has_ipv6 or AF_INET6 is None: 

444 return False 

445 try: 

446 with socket.socket(AF_INET6, socket.SOCK_STREAM) as sock: 

447 sock.bind(("::1", 0)) 

448 return True 

449 except OSError: 

450 return False 

451 

452 

453def parse_environ_block(data): 

454 """Parse a C environ block of environment variables into a dictionary.""" 

455 # The block is usually raw data from the target process. It might contain 

456 # trailing garbage and lines that do not look like assignments. 

457 ret = {} 

458 pos = 0 

459 

460 # localize global variable to speed up access. 

461 WINDOWS_ = WINDOWS 

462 while True: 

463 next_pos = data.find("\0", pos) 

464 # nul byte at the beginning or double nul byte means finish 

465 if next_pos <= pos: 

466 break 

467 # there might not be an equals sign 

468 equal_pos = data.find("=", pos, next_pos) 

469 if equal_pos > pos: 

470 key = data[pos:equal_pos] 

471 value = data[equal_pos + 1 : next_pos] 

472 # Windows expects environment variables to be uppercase only 

473 if WINDOWS_: 

474 key = key.upper() 

475 ret[key] = value 

476 pos = next_pos + 1 

477 

478 return ret 

479 

480 

481def sockfam_to_enum(num): 

482 """Convert a numeric socket family value to an IntEnum member. 

483 If it's not a known member, return the numeric value itself. 

484 """ 

485 try: 

486 return socket.AddressFamily(num) 

487 except ValueError: 

488 return num 

489 

490 

491def socktype_to_enum(num): 

492 """Convert a numeric socket type value to an IntEnum member. 

493 If it's not a known member, return the numeric value itself. 

494 """ 

495 try: 

496 return socket.SocketKind(num) 

497 except ValueError: 

498 return num 

499 

500 

501def conn_to_ntuple(fd, fam, type_, laddr, raddr, status, status_map, pid=None): 

502 """Convert a raw connection tuple to a proper ntuple.""" 

503 from . import _ntuples as ntp 

504 

505 if fam in {socket.AF_INET, AF_INET6}: 

506 if laddr: 

507 laddr = ntp.addr(*laddr) 

508 if raddr: 

509 raddr = ntp.addr(*raddr) 

510 if type_ == socket.SOCK_STREAM and fam in {AF_INET, AF_INET6}: 

511 status = status_map.get(status, CONN_NONE) 

512 else: 

513 status = CONN_NONE # ignore whatever C returned to us 

514 fam = sockfam_to_enum(fam) 

515 type_ = socktype_to_enum(type_) 

516 if pid is None: 

517 return ntp.pconn(fd, fam, type_, laddr, raddr, status) 

518 else: 

519 return ntp.sconn(fd, fam, type_, laddr, raddr, status, pid) 

520 

521 

522def broadcast_addr(addr): 

523 """Given the address ntuple returned by ``net_if_addrs()`` 

524 calculates the broadcast address. 

525 """ 

526 import ipaddress 

527 

528 if not addr.address or not addr.netmask: 

529 return None 

530 if addr.family == socket.AF_INET: 

531 return str( 

532 ipaddress.IPv4Network( 

533 f"{addr.address}/{addr.netmask}", strict=False 

534 ).broadcast_address 

535 ) 

536 if addr.family == socket.AF_INET6: 

537 return str( 

538 ipaddress.IPv6Network( 

539 f"{addr.address}/{addr.netmask}", strict=False 

540 ).broadcast_address 

541 ) 

542 

543 

544def deprecated_method(replacement): 

545 """A decorator which can be used to mark a method as deprecated 

546 'replcement' is the method name which will be called instead. 

547 """ 

548 

549 def outer(fun): 

550 msg = ( 

551 f"{fun.__name__}() is deprecated and will be removed; use" 

552 f" {replacement}() instead" 

553 ) 

554 if fun.__doc__ is None: 

555 fun.__doc__ = msg 

556 

557 @functools.wraps(fun) 

558 def inner(self, *args, **kwargs): 

559 warnings.warn(msg, category=DeprecationWarning, stacklevel=2) 

560 return getattr(self, replacement)(*args, **kwargs) 

561 

562 return inner 

563 

564 return outer 

565 

566 

567class _WrapNumbers: 

568 """Watches numbers so that they don't overflow and wrap 

569 (reset to zero). 

570 """ 

571 

572 def __init__(self): 

573 self.lock = threading.Lock() 

574 self.cache = {} 

575 self.reminders = {} 

576 self.reminder_keys = {} 

577 

578 def _add_dict(self, input_dict, name): 

579 assert name not in self.cache 

580 assert name not in self.reminders 

581 assert name not in self.reminder_keys 

582 self.cache[name] = input_dict 

583 self.reminders[name] = collections.defaultdict(int) 

584 self.reminder_keys[name] = collections.defaultdict(set) 

585 

586 def _remove_dead_reminders(self, input_dict, name): 

587 """In case the number of keys changed between calls (e.g. a 

588 disk disappears) this removes the entry from self.reminders. 

589 """ 

590 old_dict = self.cache[name] 

591 gone_keys = set(old_dict.keys()) - set(input_dict.keys()) 

592 for gone_key in gone_keys: 

593 for remkey in self.reminder_keys[name][gone_key]: 

594 del self.reminders[name][remkey] 

595 del self.reminder_keys[name][gone_key] 

596 

597 def run(self, input_dict, name): 

598 """Cache dict and sum numbers which overflow and wrap. 

599 Return an updated copy of `input_dict`. 

600 """ 

601 if name not in self.cache: 

602 # This was the first call. 

603 self._add_dict(input_dict, name) 

604 return input_dict 

605 

606 self._remove_dead_reminders(input_dict, name) 

607 

608 old_dict = self.cache[name] 

609 new_dict = {} 

610 for key in input_dict: 

611 input_tuple = input_dict[key] 

612 try: 

613 old_tuple = old_dict[key] 

614 except KeyError: 

615 # The input dict has a new key (e.g. a new disk or NIC) 

616 # which didn't exist in the previous call. 

617 new_dict[key] = input_tuple 

618 continue 

619 

620 bits = [] 

621 for i in range(len(input_tuple)): 

622 input_value = input_tuple[i] 

623 old_value = old_tuple[i] 

624 remkey = (key, i) 

625 if input_value < old_value: 

626 # it wrapped! 

627 self.reminders[name][remkey] += old_value 

628 self.reminder_keys[name][key].add(remkey) 

629 bits.append(input_value + self.reminders[name][remkey]) 

630 

631 new_dict[key] = tuple(bits) 

632 

633 self.cache[name] = input_dict 

634 return new_dict 

635 

636 def cache_clear(self, name=None): 

637 """Clear the internal cache, optionally only for function 'name'.""" 

638 with self.lock: 

639 if name is None: 

640 self.cache.clear() 

641 self.reminders.clear() 

642 self.reminder_keys.clear() 

643 else: 

644 self.cache.pop(name, None) 

645 self.reminders.pop(name, None) 

646 self.reminder_keys.pop(name, None) 

647 

648 def cache_info(self): 

649 """Return internal cache dicts as a tuple of 3 elements.""" 

650 with self.lock: 

651 return (self.cache, self.reminders, self.reminder_keys) 

652 

653 

654def wrap_numbers(input_dict, name): 

655 """Given an `input_dict` and a function `name`, adjust the numbers 

656 which "wrap" (restart from zero) across different calls by adding 

657 "old value" to "new value" and return an updated dict. 

658 """ 

659 with _wn.lock: 

660 return _wn.run(input_dict, name) 

661 

662 

663_wn = _WrapNumbers() 

664wrap_numbers.cache_clear = _wn.cache_clear 

665wrap_numbers.cache_info = _wn.cache_info 

666 

667 

668# The read buffer size for open() builtin. This (also) dictates how 

669# much data we read(2) when iterating over file lines as in: 

670# >>> with open(file) as f: 

671# ... for line in f: 

672# ... ... 

673# Default per-line buffer size for binary files is 1K. For text files 

674# is 8K. We use a bigger buffer (32K) in order to have more consistent 

675# results when reading /proc pseudo files on Linux, see: 

676# https://github.com/giampaolo/psutil/issues/2050 

677# https://github.com/giampaolo/psutil/issues/708 

678FILE_READ_BUFFER_SIZE = 32 * 1024 

679 

680 

681def open_binary(fname): 

682 return open(fname, "rb", buffering=FILE_READ_BUFFER_SIZE) 

683 

684 

685def open_text(fname): 

686 """Open a file in text mode by using the proper FS encoding and 

687 en/decoding error handlers. 

688 """ 

689 # See: 

690 # https://github.com/giampaolo/psutil/issues/675 

691 # https://github.com/giampaolo/psutil/pull/733 

692 fobj = open( # noqa: SIM115 

693 fname, 

694 buffering=FILE_READ_BUFFER_SIZE, 

695 encoding=ENCODING, 

696 errors=ENCODING_ERRS, 

697 ) 

698 try: 

699 # Dictates per-line read(2) buffer size. Defaults is 8k. See: 

700 # https://github.com/giampaolo/psutil/issues/2050#issuecomment-1013387546 

701 fobj._CHUNK_SIZE = FILE_READ_BUFFER_SIZE 

702 except AttributeError: 

703 pass 

704 except Exception: 

705 fobj.close() 

706 raise 

707 

708 return fobj 

709 

710 

711def cat(fname, fallback=_DEFAULT, _open=open_text): 

712 """Read entire file content and return it as a string. File is 

713 opened in text mode. If specified, `fallback` is the value 

714 returned in case of error, either if the file does not exist or 

715 it can't be read(). 

716 """ 

717 if fallback is _DEFAULT: 

718 with _open(fname) as f: 

719 return f.read() 

720 else: 

721 try: 

722 with _open(fname) as f: 

723 return f.read() 

724 except OSError: 

725 return fallback 

726 

727 

728def bcat(fname, fallback=_DEFAULT): 

729 """Same as above but opens file in binary mode.""" 

730 return cat(fname, fallback=fallback, _open=open_binary) 

731 

732 

733def bytes2human(n, format="%(value).1f%(symbol)s"): 

734 """Used by various scripts. See: https://code.activestate.com/recipes/578019-bytes-to-human-human-to-bytes-converter/?in=user-4178764. 

735 

736 >>> bytes2human(10000) 

737 '9.8K' 

738 >>> bytes2human(100001221) 

739 '95.4M' 

740 """ 

741 symbols = ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y') 

742 prefix = {} 

743 for i, s in enumerate(symbols[1:]): 

744 prefix[s] = 1 << (i + 1) * 10 

745 for symbol in reversed(symbols[1:]): 

746 if abs(n) >= prefix[symbol]: 

747 value = float(n) / prefix[symbol] 

748 return format % locals() 

749 return format % dict(symbol=symbols[0], value=n) 

750 

751 

752def get_procfs_path(): 

753 """Return updated psutil.PROCFS_PATH constant.""" 

754 return sys.modules['psutil'].PROCFS_PATH 

755 

756 

757def decode(s): 

758 return s.decode(encoding=ENCODING, errors=ENCODING_ERRS) 

759 

760 

761# ===================================================================== 

762# --- shell utils 

763# ===================================================================== 

764 

765 

766@memoize 

767def term_supports_colors(file=sys.stdout): # pragma: no cover 

768 if os.name == 'nt': 

769 return True 

770 try: 

771 import curses 

772 

773 assert file.isatty() 

774 curses.setupterm() 

775 return curses.tigetnum("colors") > 0 

776 except Exception: # noqa: BLE001 

777 return False 

778 

779 

780def hilite(s, color=None, bold=False): # pragma: no cover 

781 """Return an highlighted version of 'string'.""" 

782 if not term_supports_colors(): 

783 return s 

784 attr = [] 

785 colors = dict( 

786 blue='34', 

787 brown='33', 

788 darkgrey='30', 

789 green='32', 

790 grey='37', 

791 lightblue='36', 

792 red='91', 

793 violet='35', 

794 yellow='93', 

795 ) 

796 colors[None] = '29' 

797 try: 

798 color = colors[color] 

799 except KeyError: 

800 msg = f"invalid color {color!r}; choose amongst {list(colors.keys())}" 

801 raise ValueError(msg) from None 

802 attr.append(color) 

803 if bold: 

804 attr.append('1') 

805 return f"\x1b[{';'.join(attr)}m{s}\x1b[0m" 

806 

807 

808def print_color( 

809 s, color=None, bold=False, file=sys.stdout 

810): # pragma: no cover 

811 """Print a colorized version of string.""" 

812 if not term_supports_colors(): 

813 print(s, file=file) 

814 elif POSIX: 

815 print(hilite(s, color, bold), file=file) 

816 else: 

817 import ctypes 

818 

819 DEFAULT_COLOR = 7 

820 GetStdHandle = ctypes.windll.Kernel32.GetStdHandle 

821 SetConsoleTextAttribute = ( 

822 ctypes.windll.Kernel32.SetConsoleTextAttribute 

823 ) 

824 

825 colors = dict(green=2, red=4, brown=6, yellow=6) 

826 colors[None] = DEFAULT_COLOR 

827 try: 

828 color = colors[color] 

829 except KeyError: 

830 msg = ( 

831 f"invalid color {color!r}; choose between" 

832 f" {list(colors.keys())!r}" 

833 ) 

834 raise ValueError(msg) from None 

835 if bold and color <= 7: 

836 color += 8 

837 

838 handle_id = -12 if file is sys.stderr else -11 

839 GetStdHandle.restype = ctypes.c_ulong 

840 handle = GetStdHandle(handle_id) 

841 SetConsoleTextAttribute(handle, color) 

842 try: 

843 print(s, file=file) 

844 finally: 

845 SetConsoleTextAttribute(handle, DEFAULT_COLOR) 

846 

847 

848def debug(msg): 

849 """If PSUTIL_DEBUG env var is set, print a debug message to stderr.""" 

850 if PSUTIL_DEBUG: 

851 import inspect 

852 

853 fname, lineno, _, _lines, _index = inspect.getframeinfo( 

854 inspect.currentframe().f_back 

855 ) 

856 if isinstance(msg, Exception): 

857 if isinstance(msg, OSError): 

858 # ...because str(exc) may contain info about the file name 

859 msg = f"ignoring {msg}" 

860 else: 

861 msg = f"ignoring {msg!r}" 

862 print( # noqa: T201 

863 f"psutil-debug [{fname}:{lineno}]> {msg}", file=sys.stderr 

864 )