1# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Common objects shared by __init__.py and _ps*.py modules.
6
7Note: this module is imported by setup.py, so it should not import
8psutil or third-party modules.
9"""
10
11import collections
12import enum
13import functools
14import os
15import socket
16import stat
17import sys
18import threading
19import warnings
20from socket import AF_INET
21from socket import SOCK_DGRAM
22from socket import SOCK_STREAM
23
24try:
25 from socket import AF_INET6
26except ImportError:
27 AF_INET6 = None
28try:
29 from socket import AF_UNIX
30except ImportError:
31 AF_UNIX = None
32
33
34PSUTIL_DEBUG = bool(os.getenv('PSUTIL_DEBUG'))
35_DEFAULT = object()
36
37# fmt: off
38__all__ = [
39 # OS constants
40 'FREEBSD', 'BSD', 'LINUX', 'NETBSD', 'OPENBSD', 'MACOS', 'OSX', 'POSIX',
41 'SUNOS', 'WINDOWS',
42 # connection constants
43 'CONN_CLOSE', 'CONN_CLOSE_WAIT', 'CONN_CLOSING', 'CONN_ESTABLISHED',
44 'CONN_FIN_WAIT1', 'CONN_FIN_WAIT2', 'CONN_LAST_ACK', 'CONN_LISTEN',
45 'CONN_NONE', 'CONN_SYN_RECV', 'CONN_SYN_SENT', 'CONN_TIME_WAIT',
46 # net constants
47 'NIC_DUPLEX_FULL', 'NIC_DUPLEX_HALF', 'NIC_DUPLEX_UNKNOWN', # noqa: F822
48 # process status constants
49 'STATUS_DEAD', 'STATUS_DISK_SLEEP', 'STATUS_IDLE', 'STATUS_LOCKED',
50 'STATUS_RUNNING', 'STATUS_SLEEPING', 'STATUS_STOPPED', 'STATUS_SUSPENDED',
51 'STATUS_TRACING_STOP', 'STATUS_WAITING', 'STATUS_WAKE_KILL',
52 'STATUS_WAKING', 'STATUS_ZOMBIE', 'STATUS_PARKED',
53 # other constants
54 'ENCODING', 'ENCODING_ERRS', 'AF_INET6',
55 # utility functions
56 'conn_tmap', 'deprecated_method', 'isfile_strict', 'memoize',
57 'parse_environ_block', 'path_exists_strict', 'usage_percent',
58 'supports_ipv6', 'sockfam_to_enum', 'socktype_to_enum', "wrap_numbers",
59 'open_text', 'open_binary', 'cat', 'bcat',
60 'bytes2human', 'conn_to_ntuple', 'debug',
61 # shell utils
62 'hilite', 'term_supports_colors', 'print_color',
63]
64# fmt: on
65
66
67# ===================================================================
68# --- OS constants
69# ===================================================================
70
71
72POSIX = os.name == "posix"
73WINDOWS = os.name == "nt"
74LINUX = sys.platform.startswith("linux")
75MACOS = sys.platform.startswith("darwin")
76OSX = MACOS # deprecated alias
77FREEBSD = sys.platform.startswith(("freebsd", "midnightbsd"))
78OPENBSD = sys.platform.startswith("openbsd")
79NETBSD = sys.platform.startswith("netbsd")
80BSD = FREEBSD or OPENBSD or NETBSD
81SUNOS = sys.platform.startswith(("sunos", "solaris"))
82AIX = sys.platform.startswith("aix")
83
84
85# ===================================================================
86# --- API constants
87# ===================================================================
88
89
90# Process.status()
91STATUS_RUNNING = "running"
92STATUS_SLEEPING = "sleeping"
93STATUS_DISK_SLEEP = "disk-sleep"
94STATUS_STOPPED = "stopped"
95STATUS_TRACING_STOP = "tracing-stop"
96STATUS_ZOMBIE = "zombie"
97STATUS_DEAD = "dead"
98STATUS_WAKE_KILL = "wake-kill"
99STATUS_WAKING = "waking"
100STATUS_IDLE = "idle" # Linux, macOS, FreeBSD
101STATUS_LOCKED = "locked" # FreeBSD
102STATUS_WAITING = "waiting" # FreeBSD
103STATUS_SUSPENDED = "suspended" # NetBSD
104STATUS_PARKED = "parked" # Linux
105
106# Process.net_connections() and psutil.net_connections()
107CONN_ESTABLISHED = "ESTABLISHED"
108CONN_SYN_SENT = "SYN_SENT"
109CONN_SYN_RECV = "SYN_RECV"
110CONN_FIN_WAIT1 = "FIN_WAIT1"
111CONN_FIN_WAIT2 = "FIN_WAIT2"
112CONN_TIME_WAIT = "TIME_WAIT"
113CONN_CLOSE = "CLOSE"
114CONN_CLOSE_WAIT = "CLOSE_WAIT"
115CONN_LAST_ACK = "LAST_ACK"
116CONN_LISTEN = "LISTEN"
117CONN_CLOSING = "CLOSING"
118CONN_NONE = "NONE"
119
120
121# net_if_stats()
122class NicDuplex(enum.IntEnum):
123 NIC_DUPLEX_FULL = 2
124 NIC_DUPLEX_HALF = 1
125 NIC_DUPLEX_UNKNOWN = 0
126
127
128globals().update(NicDuplex.__members__)
129
130
131# sensors_battery()
132class BatteryTime(enum.IntEnum):
133 POWER_TIME_UNKNOWN = -1
134 POWER_TIME_UNLIMITED = -2
135
136
137globals().update(BatteryTime.__members__)
138
139# --- others
140
141ENCODING = sys.getfilesystemencoding()
142ENCODING_ERRS = sys.getfilesystemencodeerrors()
143
144
145# ===================================================================
146# --- Process.net_connections() 'kind' parameter mapping
147# ===================================================================
148
149
150conn_tmap = {
151 "all": ([AF_INET, AF_INET6, AF_UNIX], [SOCK_STREAM, SOCK_DGRAM]),
152 "tcp": ([AF_INET, AF_INET6], [SOCK_STREAM]),
153 "tcp4": ([AF_INET], [SOCK_STREAM]),
154 "udp": ([AF_INET, AF_INET6], [SOCK_DGRAM]),
155 "udp4": ([AF_INET], [SOCK_DGRAM]),
156 "inet": ([AF_INET, AF_INET6], [SOCK_STREAM, SOCK_DGRAM]),
157 "inet4": ([AF_INET], [SOCK_STREAM, SOCK_DGRAM]),
158 "inet6": ([AF_INET6], [SOCK_STREAM, SOCK_DGRAM]),
159}
160
161if AF_INET6 is not None:
162 conn_tmap.update({
163 "tcp6": ([AF_INET6], [SOCK_STREAM]),
164 "udp6": ([AF_INET6], [SOCK_DGRAM]),
165 })
166
167if AF_UNIX is not None and not SUNOS:
168 conn_tmap.update({"unix": ([AF_UNIX], [SOCK_STREAM, SOCK_DGRAM])})
169
170
171# =====================================================================
172# --- Exceptions
173# =====================================================================
174
175
176class Error(Exception):
177 """Base exception class. All other psutil exceptions inherit
178 from this one.
179 """
180
181 __module__ = 'psutil'
182
183 def _infodict(self, attrs):
184 info = collections.OrderedDict()
185 for name in attrs:
186 value = getattr(self, name, None)
187 if value or (name == "pid" and value == 0):
188 info[name] = value
189 return info
190
191 def __str__(self):
192 # invoked on `raise Error`
193 info = self._infodict(("pid", "ppid", "name"))
194 if info:
195 details = "({})".format(
196 ", ".join([f"{k}={v!r}" for k, v in info.items()])
197 )
198 else:
199 details = None
200 return " ".join([x for x in (getattr(self, "msg", ""), details) if x])
201
202 def __repr__(self):
203 # invoked on `repr(Error)`
204 info = self._infodict(("pid", "ppid", "name", "seconds", "msg"))
205 details = ", ".join([f"{k}={v!r}" for k, v in info.items()])
206 return f"psutil.{self.__class__.__name__}({details})"
207
208
209class NoSuchProcess(Error):
210 """Exception raised when a process with a certain PID doesn't
211 or no longer exists.
212 """
213
214 __module__ = 'psutil'
215
216 def __init__(self, pid, name=None, msg=None):
217 Error.__init__(self)
218 self.pid = pid
219 self.name = name
220 self.msg = msg or "process no longer exists"
221
222 def __reduce__(self):
223 return (self.__class__, (self.pid, self.name, self.msg))
224
225
226class ZombieProcess(NoSuchProcess):
227 """Exception raised when querying a zombie process. This is
228 raised on macOS, BSD and Solaris only, and not always: depending
229 on the query the OS may be able to succeed anyway.
230 On Linux all zombie processes are querable (hence this is never
231 raised). Windows doesn't have zombie processes.
232 """
233
234 __module__ = 'psutil'
235
236 def __init__(self, pid, name=None, ppid=None, msg=None):
237 NoSuchProcess.__init__(self, pid, name, msg)
238 self.ppid = ppid
239 self.msg = msg or "PID still exists but it's a zombie"
240
241 def __reduce__(self):
242 return (self.__class__, (self.pid, self.name, self.ppid, self.msg))
243
244
245class AccessDenied(Error):
246 """Exception raised when permission to perform an action is denied."""
247
248 __module__ = 'psutil'
249
250 def __init__(self, pid=None, name=None, msg=None):
251 Error.__init__(self)
252 self.pid = pid
253 self.name = name
254 self.msg = msg or ""
255
256 def __reduce__(self):
257 return (self.__class__, (self.pid, self.name, self.msg))
258
259
260class TimeoutExpired(Error):
261 """Raised on Process.wait(timeout) if timeout expires and process
262 is still alive.
263 """
264
265 __module__ = 'psutil'
266
267 def __init__(self, seconds, pid=None, name=None):
268 Error.__init__(self)
269 self.seconds = seconds
270 self.pid = pid
271 self.name = name
272 self.msg = f"timeout after {seconds} seconds"
273
274 def __reduce__(self):
275 return (self.__class__, (self.seconds, self.pid, self.name))
276
277
278# ===================================================================
279# --- utils
280# ===================================================================
281
282
283def usage_percent(used, total, round_=None):
284 """Calculate percentage usage of 'used' against 'total'."""
285 try:
286 ret = (float(used) / total) * 100
287 except ZeroDivisionError:
288 return 0.0
289 else:
290 if round_ is not None:
291 ret = round(ret, round_)
292 return ret
293
294
295def memoize(fun):
296 """A simple memoize decorator for functions supporting (hashable)
297 positional arguments.
298 It also provides a cache_clear() function for clearing the cache:
299
300 >>> @memoize
301 ... def foo()
302 ... return 1
303 ...
304 >>> foo()
305 1
306 >>> foo.cache_clear()
307 >>>
308
309 It supports:
310 - functions
311 - classes (acts as a @singleton)
312 - staticmethods
313 - classmethods
314
315 It does NOT support:
316 - methods
317 """
318
319 @functools.wraps(fun)
320 def wrapper(*args, **kwargs):
321 key = (args, frozenset(sorted(kwargs.items())))
322 try:
323 return cache[key]
324 except KeyError:
325 try:
326 ret = cache[key] = fun(*args, **kwargs)
327 except Exception as err:
328 raise err from None
329 return ret
330
331 def cache_clear():
332 """Clear cache."""
333 cache.clear()
334
335 cache = {}
336 wrapper.cache_clear = cache_clear
337 return wrapper
338
339
340def memoize_when_activated(fun):
341 """A memoize decorator which is disabled by default. It can be
342 activated and deactivated on request.
343 For efficiency reasons it can be used only against class methods
344 accepting no arguments.
345
346 >>> class Foo:
347 ... @memoize
348 ... def foo()
349 ... print(1)
350 ...
351 >>> f = Foo()
352 >>> # deactivated (default)
353 >>> foo()
354 1
355 >>> foo()
356 1
357 >>>
358 >>> # activated
359 >>> foo.cache_activate(self)
360 >>> foo()
361 1
362 >>> foo()
363 >>> foo()
364 >>>
365 """
366
367 @functools.wraps(fun)
368 def wrapper(self):
369 try:
370 # case 1: we previously entered oneshot() ctx
371 ret = self._cache[fun]
372 except AttributeError:
373 # case 2: we never entered oneshot() ctx
374 try:
375 return fun(self)
376 except Exception as err:
377 raise err from None
378 except KeyError:
379 # case 3: we entered oneshot() ctx but there's no cache
380 # for this entry yet
381 try:
382 ret = fun(self)
383 except Exception as err:
384 raise err from None
385 try:
386 self._cache[fun] = ret
387 except AttributeError:
388 # multi-threading race condition, see:
389 # https://github.com/giampaolo/psutil/issues/1948
390 pass
391 return ret
392
393 def cache_activate(proc):
394 """Activate cache. Expects a Process instance. Cache will be
395 stored as a "_cache" instance attribute.
396 """
397 proc._cache = {}
398
399 def cache_deactivate(proc):
400 """Deactivate and clear cache."""
401 try:
402 del proc._cache
403 except AttributeError:
404 pass
405
406 wrapper.cache_activate = cache_activate
407 wrapper.cache_deactivate = cache_deactivate
408 return wrapper
409
410
411def isfile_strict(path):
412 """Same as os.path.isfile() but does not swallow EACCES / EPERM
413 exceptions, see:
414 http://mail.python.org/pipermail/python-dev/2012-June/120787.html.
415 """
416 try:
417 st = os.stat(path)
418 except PermissionError:
419 raise
420 except OSError:
421 return False
422 else:
423 return stat.S_ISREG(st.st_mode)
424
425
426def path_exists_strict(path):
427 """Same as os.path.exists() but does not swallow EACCES / EPERM
428 exceptions. See:
429 http://mail.python.org/pipermail/python-dev/2012-June/120787.html.
430 """
431 try:
432 os.stat(path)
433 except PermissionError:
434 raise
435 except OSError:
436 return False
437 else:
438 return True
439
440
441def supports_ipv6():
442 """Return True if IPv6 is supported on this platform."""
443 if not socket.has_ipv6 or AF_INET6 is None:
444 return False
445 try:
446 with socket.socket(AF_INET6, socket.SOCK_STREAM) as sock:
447 sock.bind(("::1", 0))
448 return True
449 except OSError:
450 return False
451
452
453def parse_environ_block(data):
454 """Parse a C environ block of environment variables into a dictionary."""
455 # The block is usually raw data from the target process. It might contain
456 # trailing garbage and lines that do not look like assignments.
457 ret = {}
458 pos = 0
459
460 # localize global variable to speed up access.
461 WINDOWS_ = WINDOWS
462 while True:
463 next_pos = data.find("\0", pos)
464 # nul byte at the beginning or double nul byte means finish
465 if next_pos <= pos:
466 break
467 # there might not be an equals sign
468 equal_pos = data.find("=", pos, next_pos)
469 if equal_pos > pos:
470 key = data[pos:equal_pos]
471 value = data[equal_pos + 1 : next_pos]
472 # Windows expects environment variables to be uppercase only
473 if WINDOWS_:
474 key = key.upper()
475 ret[key] = value
476 pos = next_pos + 1
477
478 return ret
479
480
481def sockfam_to_enum(num):
482 """Convert a numeric socket family value to an IntEnum member.
483 If it's not a known member, return the numeric value itself.
484 """
485 try:
486 return socket.AddressFamily(num)
487 except ValueError:
488 return num
489
490
491def socktype_to_enum(num):
492 """Convert a numeric socket type value to an IntEnum member.
493 If it's not a known member, return the numeric value itself.
494 """
495 try:
496 return socket.SocketKind(num)
497 except ValueError:
498 return num
499
500
501def conn_to_ntuple(fd, fam, type_, laddr, raddr, status, status_map, pid=None):
502 """Convert a raw connection tuple to a proper ntuple."""
503 from . import _ntuples as ntp
504
505 if fam in {socket.AF_INET, AF_INET6}:
506 if laddr:
507 laddr = ntp.addr(*laddr)
508 if raddr:
509 raddr = ntp.addr(*raddr)
510 if type_ == socket.SOCK_STREAM and fam in {AF_INET, AF_INET6}:
511 status = status_map.get(status, CONN_NONE)
512 else:
513 status = CONN_NONE # ignore whatever C returned to us
514 fam = sockfam_to_enum(fam)
515 type_ = socktype_to_enum(type_)
516 if pid is None:
517 return ntp.pconn(fd, fam, type_, laddr, raddr, status)
518 else:
519 return ntp.sconn(fd, fam, type_, laddr, raddr, status, pid)
520
521
522def broadcast_addr(addr):
523 """Given the address ntuple returned by ``net_if_addrs()``
524 calculates the broadcast address.
525 """
526 import ipaddress
527
528 if not addr.address or not addr.netmask:
529 return None
530 if addr.family == socket.AF_INET:
531 return str(
532 ipaddress.IPv4Network(
533 f"{addr.address}/{addr.netmask}", strict=False
534 ).broadcast_address
535 )
536 if addr.family == socket.AF_INET6:
537 return str(
538 ipaddress.IPv6Network(
539 f"{addr.address}/{addr.netmask}", strict=False
540 ).broadcast_address
541 )
542
543
544def deprecated_method(replacement):
545 """A decorator which can be used to mark a method as deprecated
546 'replcement' is the method name which will be called instead.
547 """
548
549 def outer(fun):
550 msg = (
551 f"{fun.__name__}() is deprecated and will be removed; use"
552 f" {replacement}() instead"
553 )
554 if fun.__doc__ is None:
555 fun.__doc__ = msg
556
557 @functools.wraps(fun)
558 def inner(self, *args, **kwargs):
559 warnings.warn(msg, category=DeprecationWarning, stacklevel=2)
560 return getattr(self, replacement)(*args, **kwargs)
561
562 return inner
563
564 return outer
565
566
567class _WrapNumbers:
568 """Watches numbers so that they don't overflow and wrap
569 (reset to zero).
570 """
571
572 def __init__(self):
573 self.lock = threading.Lock()
574 self.cache = {}
575 self.reminders = {}
576 self.reminder_keys = {}
577
578 def _add_dict(self, input_dict, name):
579 assert name not in self.cache
580 assert name not in self.reminders
581 assert name not in self.reminder_keys
582 self.cache[name] = input_dict
583 self.reminders[name] = collections.defaultdict(int)
584 self.reminder_keys[name] = collections.defaultdict(set)
585
586 def _remove_dead_reminders(self, input_dict, name):
587 """In case the number of keys changed between calls (e.g. a
588 disk disappears) this removes the entry from self.reminders.
589 """
590 old_dict = self.cache[name]
591 gone_keys = set(old_dict.keys()) - set(input_dict.keys())
592 for gone_key in gone_keys:
593 for remkey in self.reminder_keys[name][gone_key]:
594 del self.reminders[name][remkey]
595 del self.reminder_keys[name][gone_key]
596
597 def run(self, input_dict, name):
598 """Cache dict and sum numbers which overflow and wrap.
599 Return an updated copy of `input_dict`.
600 """
601 if name not in self.cache:
602 # This was the first call.
603 self._add_dict(input_dict, name)
604 return input_dict
605
606 self._remove_dead_reminders(input_dict, name)
607
608 old_dict = self.cache[name]
609 new_dict = {}
610 for key in input_dict:
611 input_tuple = input_dict[key]
612 try:
613 old_tuple = old_dict[key]
614 except KeyError:
615 # The input dict has a new key (e.g. a new disk or NIC)
616 # which didn't exist in the previous call.
617 new_dict[key] = input_tuple
618 continue
619
620 bits = []
621 for i in range(len(input_tuple)):
622 input_value = input_tuple[i]
623 old_value = old_tuple[i]
624 remkey = (key, i)
625 if input_value < old_value:
626 # it wrapped!
627 self.reminders[name][remkey] += old_value
628 self.reminder_keys[name][key].add(remkey)
629 bits.append(input_value + self.reminders[name][remkey])
630
631 new_dict[key] = tuple(bits)
632
633 self.cache[name] = input_dict
634 return new_dict
635
636 def cache_clear(self, name=None):
637 """Clear the internal cache, optionally only for function 'name'."""
638 with self.lock:
639 if name is None:
640 self.cache.clear()
641 self.reminders.clear()
642 self.reminder_keys.clear()
643 else:
644 self.cache.pop(name, None)
645 self.reminders.pop(name, None)
646 self.reminder_keys.pop(name, None)
647
648 def cache_info(self):
649 """Return internal cache dicts as a tuple of 3 elements."""
650 with self.lock:
651 return (self.cache, self.reminders, self.reminder_keys)
652
653
654def wrap_numbers(input_dict, name):
655 """Given an `input_dict` and a function `name`, adjust the numbers
656 which "wrap" (restart from zero) across different calls by adding
657 "old value" to "new value" and return an updated dict.
658 """
659 with _wn.lock:
660 return _wn.run(input_dict, name)
661
662
663_wn = _WrapNumbers()
664wrap_numbers.cache_clear = _wn.cache_clear
665wrap_numbers.cache_info = _wn.cache_info
666
667
668# The read buffer size for open() builtin. This (also) dictates how
669# much data we read(2) when iterating over file lines as in:
670# >>> with open(file) as f:
671# ... for line in f:
672# ... ...
673# Default per-line buffer size for binary files is 1K. For text files
674# is 8K. We use a bigger buffer (32K) in order to have more consistent
675# results when reading /proc pseudo files on Linux, see:
676# https://github.com/giampaolo/psutil/issues/2050
677# https://github.com/giampaolo/psutil/issues/708
678FILE_READ_BUFFER_SIZE = 32 * 1024
679
680
681def open_binary(fname):
682 return open(fname, "rb", buffering=FILE_READ_BUFFER_SIZE)
683
684
685def open_text(fname):
686 """Open a file in text mode by using the proper FS encoding and
687 en/decoding error handlers.
688 """
689 # See:
690 # https://github.com/giampaolo/psutil/issues/675
691 # https://github.com/giampaolo/psutil/pull/733
692 fobj = open( # noqa: SIM115
693 fname,
694 buffering=FILE_READ_BUFFER_SIZE,
695 encoding=ENCODING,
696 errors=ENCODING_ERRS,
697 )
698 try:
699 # Dictates per-line read(2) buffer size. Defaults is 8k. See:
700 # https://github.com/giampaolo/psutil/issues/2050#issuecomment-1013387546
701 fobj._CHUNK_SIZE = FILE_READ_BUFFER_SIZE
702 except AttributeError:
703 pass
704 except Exception:
705 fobj.close()
706 raise
707
708 return fobj
709
710
711def cat(fname, fallback=_DEFAULT, _open=open_text):
712 """Read entire file content and return it as a string. File is
713 opened in text mode. If specified, `fallback` is the value
714 returned in case of error, either if the file does not exist or
715 it can't be read().
716 """
717 if fallback is _DEFAULT:
718 with _open(fname) as f:
719 return f.read()
720 else:
721 try:
722 with _open(fname) as f:
723 return f.read()
724 except OSError:
725 return fallback
726
727
728def bcat(fname, fallback=_DEFAULT):
729 """Same as above but opens file in binary mode."""
730 return cat(fname, fallback=fallback, _open=open_binary)
731
732
733def bytes2human(n, format="%(value).1f%(symbol)s"):
734 """Used by various scripts. See: https://code.activestate.com/recipes/578019-bytes-to-human-human-to-bytes-converter/?in=user-4178764.
735
736 >>> bytes2human(10000)
737 '9.8K'
738 >>> bytes2human(100001221)
739 '95.4M'
740 """
741 symbols = ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
742 prefix = {}
743 for i, s in enumerate(symbols[1:]):
744 prefix[s] = 1 << (i + 1) * 10
745 for symbol in reversed(symbols[1:]):
746 if abs(n) >= prefix[symbol]:
747 value = float(n) / prefix[symbol]
748 return format % locals()
749 return format % dict(symbol=symbols[0], value=n)
750
751
752def get_procfs_path():
753 """Return updated psutil.PROCFS_PATH constant."""
754 return sys.modules['psutil'].PROCFS_PATH
755
756
757def decode(s):
758 return s.decode(encoding=ENCODING, errors=ENCODING_ERRS)
759
760
761# =====================================================================
762# --- shell utils
763# =====================================================================
764
765
766@memoize
767def term_supports_colors(file=sys.stdout): # pragma: no cover
768 if os.name == 'nt':
769 return True
770 try:
771 import curses
772
773 assert file.isatty()
774 curses.setupterm()
775 return curses.tigetnum("colors") > 0
776 except Exception: # noqa: BLE001
777 return False
778
779
780def hilite(s, color=None, bold=False): # pragma: no cover
781 """Return an highlighted version of 'string'."""
782 if not term_supports_colors():
783 return s
784 attr = []
785 colors = dict(
786 blue='34',
787 brown='33',
788 darkgrey='30',
789 green='32',
790 grey='37',
791 lightblue='36',
792 red='91',
793 violet='35',
794 yellow='93',
795 )
796 colors[None] = '29'
797 try:
798 color = colors[color]
799 except KeyError:
800 msg = f"invalid color {color!r}; choose amongst {list(colors.keys())}"
801 raise ValueError(msg) from None
802 attr.append(color)
803 if bold:
804 attr.append('1')
805 return f"\x1b[{';'.join(attr)}m{s}\x1b[0m"
806
807
808def print_color(
809 s, color=None, bold=False, file=sys.stdout
810): # pragma: no cover
811 """Print a colorized version of string."""
812 if not term_supports_colors():
813 print(s, file=file)
814 elif POSIX:
815 print(hilite(s, color, bold), file=file)
816 else:
817 import ctypes
818
819 DEFAULT_COLOR = 7
820 GetStdHandle = ctypes.windll.Kernel32.GetStdHandle
821 SetConsoleTextAttribute = (
822 ctypes.windll.Kernel32.SetConsoleTextAttribute
823 )
824
825 colors = dict(green=2, red=4, brown=6, yellow=6)
826 colors[None] = DEFAULT_COLOR
827 try:
828 color = colors[color]
829 except KeyError:
830 msg = (
831 f"invalid color {color!r}; choose between"
832 f" {list(colors.keys())!r}"
833 )
834 raise ValueError(msg) from None
835 if bold and color <= 7:
836 color += 8
837
838 handle_id = -12 if file is sys.stderr else -11
839 GetStdHandle.restype = ctypes.c_ulong
840 handle = GetStdHandle(handle_id)
841 SetConsoleTextAttribute(handle, color)
842 try:
843 print(s, file=file)
844 finally:
845 SetConsoleTextAttribute(handle, DEFAULT_COLOR)
846
847
848def debug(msg):
849 """If PSUTIL_DEBUG env var is set, print a debug message to stderr."""
850 if PSUTIL_DEBUG:
851 import inspect
852
853 fname, lineno, _, _lines, _index = inspect.getframeinfo(
854 inspect.currentframe().f_back
855 )
856 if isinstance(msg, Exception):
857 if isinstance(msg, OSError):
858 # ...because str(exc) may contain info about the file name
859 msg = f"ignoring {msg}"
860 else:
861 msg = f"ignoring {msg!r}"
862 print( # noqa: T201
863 f"psutil-debug [{fname}:{lineno}]> {msg}", file=sys.stderr
864 )