Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/psutil/_compat.py: 13%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
5"""Module which provides compatibility with older Python versions.
6This is more future-compatible rather than the opposite (prefer latest
7Python 3 way of doing things).
8"""
10import collections
11import contextlib
12import errno
13import functools
14import os
15import sys
16import types
19# fmt: off
20__all__ = [
21 # constants
22 "PY3",
23 # builtins
24 "long", "range", "super", "unicode", "basestring",
25 # literals
26 "u", "b",
27 # collections module
28 "lru_cache",
29 # shutil module
30 "which", "get_terminal_size",
31 # contextlib module
32 "redirect_stderr",
33 # python 3 exceptions
34 "FileNotFoundError", "PermissionError", "ProcessLookupError",
35 "InterruptedError", "ChildProcessError", "FileExistsError",
36]
37# fmt: on
40PY3 = sys.version_info[0] >= 3
41_SENTINEL = object()
43if PY3:
44 long = int
45 xrange = range
46 unicode = str
47 basestring = str
48 range = range
50 def u(s):
51 return s
53 def b(s):
54 return s.encode("latin-1")
56else:
57 long = long
58 range = xrange
59 unicode = unicode
60 basestring = basestring
62 def u(s):
63 return unicode(s, "unicode_escape")
65 def b(s):
66 return s
69# --- builtins
72# Python 3 super().
73# Taken from "future" package.
74# Credit: Ryan Kelly
75if PY3:
76 super = super
77else:
78 _builtin_super = super
80 def super(type_=_SENTINEL, type_or_obj=_SENTINEL, framedepth=1):
81 """Like Python 3 builtin super(). If called without any arguments
82 it attempts to infer them at runtime.
83 """
84 if type_ is _SENTINEL:
85 f = sys._getframe(framedepth)
86 try:
87 # Get the function's first positional argument.
88 type_or_obj = f.f_locals[f.f_code.co_varnames[0]]
89 except (IndexError, KeyError):
90 msg = 'super() used in a function with no args'
91 raise RuntimeError(msg)
92 try:
93 # Get the MRO so we can crawl it.
94 mro = type_or_obj.__mro__
95 except (AttributeError, RuntimeError):
96 try:
97 mro = type_or_obj.__class__.__mro__
98 except AttributeError:
99 msg = 'super() used in a non-newstyle class'
100 raise RuntimeError(msg)
101 for type_ in mro:
102 # Find the class that owns the currently-executing method.
103 for meth in type_.__dict__.values():
104 # Drill down through any wrappers to the underlying func.
105 # This handles e.g. classmethod() and staticmethod().
106 try:
107 while not isinstance(meth, types.FunctionType):
108 if isinstance(meth, property):
109 # Calling __get__ on the property will invoke
110 # user code which might throw exceptions or
111 # have side effects
112 meth = meth.fget
113 else:
114 try:
115 meth = meth.__func__
116 except AttributeError:
117 meth = meth.__get__(type_or_obj, type_)
118 except (AttributeError, TypeError):
119 continue
120 if meth.func_code is f.f_code:
121 break # found
122 else:
123 # Not found. Move onto the next class in MRO.
124 continue
125 break # found
126 else:
127 msg = 'super() called outside a method'
128 raise RuntimeError(msg)
130 # Dispatch to builtin super().
131 if type_or_obj is not _SENTINEL:
132 return _builtin_super(type_, type_or_obj)
133 return _builtin_super(type_)
136# --- exceptions
139if PY3:
140 FileNotFoundError = FileNotFoundError # NOQA
141 PermissionError = PermissionError # NOQA
142 ProcessLookupError = ProcessLookupError # NOQA
143 InterruptedError = InterruptedError # NOQA
144 ChildProcessError = ChildProcessError # NOQA
145 FileExistsError = FileExistsError # NOQA
146else:
147 # https://github.com/PythonCharmers/python-future/blob/exceptions/
148 # src/future/types/exceptions/pep3151.py
149 import platform
151 def _instance_checking_exception(base_exception=Exception):
152 def wrapped(instance_checker):
153 class TemporaryClass(base_exception):
154 def __init__(self, *args, **kwargs):
155 if len(args) == 1 and isinstance(args[0], TemporaryClass):
156 unwrap_me = args[0]
157 for attr in dir(unwrap_me):
158 if not attr.startswith('__'):
159 setattr(self, attr, getattr(unwrap_me, attr))
160 else:
161 super(TemporaryClass, self).__init__( # noqa
162 *args, **kwargs
163 )
165 class __metaclass__(type):
166 def __instancecheck__(cls, inst):
167 return instance_checker(inst)
169 def __subclasscheck__(cls, classinfo):
170 value = sys.exc_info()[1]
171 return isinstance(value, cls)
173 TemporaryClass.__name__ = instance_checker.__name__
174 TemporaryClass.__doc__ = instance_checker.__doc__
175 return TemporaryClass
177 return wrapped
179 @_instance_checking_exception(EnvironmentError)
180 def FileNotFoundError(inst):
181 return getattr(inst, 'errno', _SENTINEL) == errno.ENOENT
183 @_instance_checking_exception(EnvironmentError)
184 def ProcessLookupError(inst):
185 return getattr(inst, 'errno', _SENTINEL) == errno.ESRCH
187 @_instance_checking_exception(EnvironmentError)
188 def PermissionError(inst):
189 return getattr(inst, 'errno', _SENTINEL) in (errno.EACCES, errno.EPERM)
191 @_instance_checking_exception(EnvironmentError)
192 def InterruptedError(inst):
193 return getattr(inst, 'errno', _SENTINEL) == errno.EINTR
195 @_instance_checking_exception(EnvironmentError)
196 def ChildProcessError(inst):
197 return getattr(inst, 'errno', _SENTINEL) == errno.ECHILD
199 @_instance_checking_exception(EnvironmentError)
200 def FileExistsError(inst):
201 return getattr(inst, 'errno', _SENTINEL) == errno.EEXIST
203 if platform.python_implementation() != "CPython":
204 try:
205 raise OSError(errno.EEXIST, "perm")
206 except FileExistsError:
207 pass
208 except OSError:
209 msg = (
210 "broken or incompatible Python implementation, see: "
211 "https://github.com/giampaolo/psutil/issues/1659"
212 )
213 raise RuntimeError(msg)
216# --- stdlib additions
219# py 3.2 functools.lru_cache
220# Taken from: http://code.activestate.com/recipes/578078
221# Credit: Raymond Hettinger
222try:
223 from functools import lru_cache
224except ImportError:
225 try:
226 from threading import RLock
227 except ImportError:
228 from dummy_threading import RLock
230 _CacheInfo = collections.namedtuple(
231 "CacheInfo", ["hits", "misses", "maxsize", "currsize"]
232 )
234 class _HashedSeq(list):
235 __slots__ = ('hashvalue',)
237 def __init__(self, tup, hash=hash):
238 self[:] = tup
239 self.hashvalue = hash(tup)
241 def __hash__(self):
242 return self.hashvalue
244 def _make_key(
245 args,
246 kwds,
247 typed,
248 kwd_mark=(_SENTINEL,),
249 fasttypes=set((int, str, frozenset, type(None))), # noqa
250 sorted=sorted,
251 tuple=tuple,
252 type=type,
253 len=len,
254 ):
255 key = args
256 if kwds:
257 sorted_items = sorted(kwds.items())
258 key += kwd_mark
259 for item in sorted_items:
260 key += item
261 if typed:
262 key += tuple(type(v) for v in args)
263 if kwds:
264 key += tuple(type(v) for k, v in sorted_items)
265 elif len(key) == 1 and type(key[0]) in fasttypes:
266 return key[0]
267 return _HashedSeq(key)
269 def lru_cache(maxsize=100, typed=False):
270 """Least-recently-used cache decorator, see:
271 http://docs.python.org/3/library/functools.html#functools.lru_cache.
272 """
274 def decorating_function(user_function):
275 cache = {}
276 stats = [0, 0]
277 HITS, MISSES = 0, 1
278 make_key = _make_key
279 cache_get = cache.get
280 _len = len
281 lock = RLock()
282 root = []
283 root[:] = [root, root, None, None]
284 nonlocal_root = [root]
285 PREV, NEXT, KEY, RESULT = 0, 1, 2, 3
286 if maxsize == 0:
288 def wrapper(*args, **kwds):
289 result = user_function(*args, **kwds)
290 stats[MISSES] += 1
291 return result
293 elif maxsize is None:
295 def wrapper(*args, **kwds):
296 key = make_key(args, kwds, typed)
297 result = cache_get(key, root)
298 if result is not root:
299 stats[HITS] += 1
300 return result
301 result = user_function(*args, **kwds)
302 cache[key] = result
303 stats[MISSES] += 1
304 return result
306 else:
308 def wrapper(*args, **kwds):
309 if kwds or typed:
310 key = make_key(args, kwds, typed)
311 else:
312 key = args
313 lock.acquire()
314 try:
315 link = cache_get(key)
316 if link is not None:
317 (root,) = nonlocal_root
318 link_prev, link_next, key, result = link
319 link_prev[NEXT] = link_next
320 link_next[PREV] = link_prev
321 last = root[PREV]
322 last[NEXT] = root[PREV] = link
323 link[PREV] = last
324 link[NEXT] = root
325 stats[HITS] += 1
326 return result
327 finally:
328 lock.release()
329 result = user_function(*args, **kwds)
330 lock.acquire()
331 try:
332 (root,) = nonlocal_root
333 if key in cache:
334 pass
335 elif _len(cache) >= maxsize:
336 oldroot = root
337 oldroot[KEY] = key
338 oldroot[RESULT] = result
339 root = nonlocal_root[0] = oldroot[NEXT]
340 oldkey = root[KEY]
341 root[KEY] = root[RESULT] = None
342 del cache[oldkey]
343 cache[key] = oldroot
344 else:
345 last = root[PREV]
346 link = [last, root, key, result]
347 last[NEXT] = root[PREV] = cache[key] = link
348 stats[MISSES] += 1
349 finally:
350 lock.release()
351 return result
353 def cache_info():
354 """Report cache statistics."""
355 lock.acquire()
356 try:
357 return _CacheInfo(
358 stats[HITS], stats[MISSES], maxsize, len(cache)
359 )
360 finally:
361 lock.release()
363 def cache_clear():
364 """Clear the cache and cache statistics."""
365 lock.acquire()
366 try:
367 cache.clear()
368 root = nonlocal_root[0]
369 root[:] = [root, root, None, None]
370 stats[:] = [0, 0]
371 finally:
372 lock.release()
374 wrapper.__wrapped__ = user_function
375 wrapper.cache_info = cache_info
376 wrapper.cache_clear = cache_clear
377 return functools.update_wrapper(wrapper, user_function)
379 return decorating_function
382# python 3.3
383try:
384 from shutil import which
385except ImportError:
387 def which(cmd, mode=os.F_OK | os.X_OK, path=None):
388 """Given a command, mode, and a PATH string, return the path which
389 conforms to the given mode on the PATH, or None if there is no such
390 file.
392 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
393 of os.environ.get("PATH"), or can be overridden with a custom search
394 path.
395 """
397 def _access_check(fn, mode):
398 return (
399 os.path.exists(fn)
400 and os.access(fn, mode)
401 and not os.path.isdir(fn)
402 )
404 if os.path.dirname(cmd):
405 if _access_check(cmd, mode):
406 return cmd
407 return None
409 if path is None:
410 path = os.environ.get("PATH", os.defpath)
411 if not path:
412 return None
413 path = path.split(os.pathsep)
415 if sys.platform == "win32":
416 if os.curdir not in path:
417 path.insert(0, os.curdir)
419 pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
420 if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
421 files = [cmd]
422 else:
423 files = [cmd + ext for ext in pathext]
424 else:
425 files = [cmd]
427 seen = set()
428 for dir in path:
429 normdir = os.path.normcase(dir)
430 if normdir not in seen:
431 seen.add(normdir)
432 for thefile in files:
433 name = os.path.join(dir, thefile)
434 if _access_check(name, mode):
435 return name
436 return None
439# python 3.3
440try:
441 from shutil import get_terminal_size
442except ImportError:
444 def get_terminal_size(fallback=(80, 24)):
445 try:
446 import fcntl
447 import struct
448 import termios
449 except ImportError:
450 return fallback
451 else:
452 try:
453 # This should work on Linux.
454 res = struct.unpack(
455 'hh', fcntl.ioctl(1, termios.TIOCGWINSZ, '1234')
456 )
457 return (res[1], res[0])
458 except Exception: # noqa: BLE001
459 return fallback
462# python 3.3
463try:
464 from subprocess import TimeoutExpired as SubprocessTimeoutExpired
465except ImportError:
467 class SubprocessTimeoutExpired(Exception):
468 pass
471# python 3.5
472try:
473 from contextlib import redirect_stderr
474except ImportError:
476 @contextlib.contextmanager
477 def redirect_stderr(new_target):
478 original = sys.stderr
479 try:
480 sys.stderr = new_target
481 yield new_target
482 finally:
483 sys.stderr = original