Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/psutil-5.9.6-py3.8-linux-x86_64.egg/psutil/_compat.py: 13%
298 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 07:00 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 07:00 +0000
1# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
5"""Module which provides compatibility with older Python versions.
6This is more future-compatible rather than the opposite (prefer latest
7Python 3 way of doing things).
8"""
10import collections
11import contextlib
12import errno
13import functools
14import os
15import sys
16import types
19__all__ = [
20 # constants
21 "PY3",
22 # builtins
23 "long", "range", "super", "unicode", "basestring",
24 # literals
25 "u", "b",
26 # collections module
27 "lru_cache",
28 # shutil module
29 "which", "get_terminal_size",
30 # contextlib module
31 "redirect_stderr",
32 # python 3 exceptions
33 "FileNotFoundError", "PermissionError", "ProcessLookupError",
34 "InterruptedError", "ChildProcessError", "FileExistsError"]
37PY3 = sys.version_info[0] == 3
38_SENTINEL = object()
40if PY3:
41 long = int
42 xrange = range
43 unicode = str
44 basestring = str
45 range = range
47 def u(s):
48 return s
50 def b(s):
51 return s.encode("latin-1")
52else:
53 long = long
54 range = xrange
55 unicode = unicode
56 basestring = basestring
58 def u(s):
59 return unicode(s, "unicode_escape")
61 def b(s):
62 return s
65# --- builtins
68# Python 3 super().
69# Taken from "future" package.
70# Credit: Ryan Kelly
71if PY3:
72 super = super
73else:
74 _builtin_super = super
76 def super(type_=_SENTINEL, type_or_obj=_SENTINEL, framedepth=1):
77 """Like Python 3 builtin super(). If called without any arguments
78 it attempts to infer them at runtime.
79 """
80 if type_ is _SENTINEL:
81 f = sys._getframe(framedepth)
82 try:
83 # Get the function's first positional argument.
84 type_or_obj = f.f_locals[f.f_code.co_varnames[0]]
85 except (IndexError, KeyError):
86 raise RuntimeError('super() used in a function with no args')
87 try:
88 # Get the MRO so we can crawl it.
89 mro = type_or_obj.__mro__
90 except (AttributeError, RuntimeError):
91 try:
92 mro = type_or_obj.__class__.__mro__
93 except AttributeError:
94 raise RuntimeError('super() used in a non-newstyle class')
95 for type_ in mro:
96 # Find the class that owns the currently-executing method.
97 for meth in type_.__dict__.values():
98 # Drill down through any wrappers to the underlying func.
99 # This handles e.g. classmethod() and staticmethod().
100 try:
101 while not isinstance(meth, types.FunctionType):
102 if isinstance(meth, property):
103 # Calling __get__ on the property will invoke
104 # user code which might throw exceptions or
105 # have side effects
106 meth = meth.fget
107 else:
108 try:
109 meth = meth.__func__
110 except AttributeError:
111 meth = meth.__get__(type_or_obj, type_)
112 except (AttributeError, TypeError):
113 continue
114 if meth.func_code is f.f_code:
115 break # found
116 else:
117 # Not found. Move onto the next class in MRO.
118 continue
119 break # found
120 else:
121 raise RuntimeError('super() called outside a method')
123 # Dispatch to builtin super().
124 if type_or_obj is not _SENTINEL:
125 return _builtin_super(type_, type_or_obj)
126 return _builtin_super(type_)
129# --- exceptions
132if PY3:
133 FileNotFoundError = FileNotFoundError # NOQA
134 PermissionError = PermissionError # NOQA
135 ProcessLookupError = ProcessLookupError # NOQA
136 InterruptedError = InterruptedError # NOQA
137 ChildProcessError = ChildProcessError # NOQA
138 FileExistsError = FileExistsError # NOQA
139else:
140 # https://github.com/PythonCharmers/python-future/blob/exceptions/
141 # src/future/types/exceptions/pep3151.py
142 import platform
144 def _instance_checking_exception(base_exception=Exception):
145 def wrapped(instance_checker):
146 class TemporaryClass(base_exception):
148 def __init__(self, *args, **kwargs):
149 if len(args) == 1 and isinstance(args[0], TemporaryClass):
150 unwrap_me = args[0]
151 for attr in dir(unwrap_me):
152 if not attr.startswith('__'):
153 setattr(self, attr, getattr(unwrap_me, attr))
154 else:
155 super(TemporaryClass, self).__init__(*args, **kwargs)
157 class __metaclass__(type):
158 def __instancecheck__(cls, inst):
159 return instance_checker(inst)
161 def __subclasscheck__(cls, classinfo):
162 value = sys.exc_info()[1]
163 return isinstance(value, cls)
165 TemporaryClass.__name__ = instance_checker.__name__
166 TemporaryClass.__doc__ = instance_checker.__doc__
167 return TemporaryClass
169 return wrapped
171 @_instance_checking_exception(EnvironmentError)
172 def FileNotFoundError(inst):
173 return getattr(inst, 'errno', _SENTINEL) == errno.ENOENT
175 @_instance_checking_exception(EnvironmentError)
176 def ProcessLookupError(inst):
177 return getattr(inst, 'errno', _SENTINEL) == errno.ESRCH
179 @_instance_checking_exception(EnvironmentError)
180 def PermissionError(inst):
181 return getattr(inst, 'errno', _SENTINEL) in (
182 errno.EACCES, errno.EPERM)
184 @_instance_checking_exception(EnvironmentError)
185 def InterruptedError(inst):
186 return getattr(inst, 'errno', _SENTINEL) == errno.EINTR
188 @_instance_checking_exception(EnvironmentError)
189 def ChildProcessError(inst):
190 return getattr(inst, 'errno', _SENTINEL) == errno.ECHILD
192 @_instance_checking_exception(EnvironmentError)
193 def FileExistsError(inst):
194 return getattr(inst, 'errno', _SENTINEL) == errno.EEXIST
196 if platform.python_implementation() != "CPython":
197 try:
198 raise OSError(errno.EEXIST, "perm")
199 except FileExistsError:
200 pass
201 except OSError:
202 raise RuntimeError(
203 "broken or incompatible Python implementation, see: "
204 "https://github.com/giampaolo/psutil/issues/1659")
207# --- stdlib additions
210# py 3.2 functools.lru_cache
211# Taken from: http://code.activestate.com/recipes/578078
212# Credit: Raymond Hettinger
213try:
214 from functools import lru_cache
215except ImportError:
216 try:
217 from threading import RLock
218 except ImportError:
219 from dummy_threading import RLock
221 _CacheInfo = collections.namedtuple(
222 "CacheInfo", ["hits", "misses", "maxsize", "currsize"])
224 class _HashedSeq(list):
225 __slots__ = 'hashvalue'
227 def __init__(self, tup, hash=hash):
228 self[:] = tup
229 self.hashvalue = hash(tup)
231 def __hash__(self):
232 return self.hashvalue
234 def _make_key(args, kwds, typed,
235 kwd_mark=(_SENTINEL, ),
236 fasttypes=set((int, str, frozenset, type(None))), # noqa
237 sorted=sorted, tuple=tuple, type=type, len=len):
238 key = args
239 if kwds:
240 sorted_items = sorted(kwds.items())
241 key += kwd_mark
242 for item in sorted_items:
243 key += item
244 if typed:
245 key += tuple(type(v) for v in args)
246 if kwds:
247 key += tuple(type(v) for k, v in sorted_items)
248 elif len(key) == 1 and type(key[0]) in fasttypes:
249 return key[0]
250 return _HashedSeq(key)
252 def lru_cache(maxsize=100, typed=False):
253 """Least-recently-used cache decorator, see:
254 http://docs.python.org/3/library/functools.html#functools.lru_cache
255 """
256 def decorating_function(user_function):
257 cache = {}
258 stats = [0, 0]
259 HITS, MISSES = 0, 1
260 make_key = _make_key
261 cache_get = cache.get
262 _len = len
263 lock = RLock()
264 root = []
265 root[:] = [root, root, None, None]
266 nonlocal_root = [root]
267 PREV, NEXT, KEY, RESULT = 0, 1, 2, 3
268 if maxsize == 0:
269 def wrapper(*args, **kwds):
270 result = user_function(*args, **kwds)
271 stats[MISSES] += 1
272 return result
273 elif maxsize is None:
274 def wrapper(*args, **kwds):
275 key = make_key(args, kwds, typed)
276 result = cache_get(key, root)
277 if result is not root:
278 stats[HITS] += 1
279 return result
280 result = user_function(*args, **kwds)
281 cache[key] = result
282 stats[MISSES] += 1
283 return result
284 else:
285 def wrapper(*args, **kwds):
286 if kwds or typed:
287 key = make_key(args, kwds, typed)
288 else:
289 key = args
290 lock.acquire()
291 try:
292 link = cache_get(key)
293 if link is not None:
294 root, = nonlocal_root
295 link_prev, link_next, key, result = link
296 link_prev[NEXT] = link_next
297 link_next[PREV] = link_prev
298 last = root[PREV]
299 last[NEXT] = root[PREV] = link
300 link[PREV] = last
301 link[NEXT] = root
302 stats[HITS] += 1
303 return result
304 finally:
305 lock.release()
306 result = user_function(*args, **kwds)
307 lock.acquire()
308 try:
309 root, = nonlocal_root
310 if key in cache:
311 pass
312 elif _len(cache) >= maxsize:
313 oldroot = root
314 oldroot[KEY] = key
315 oldroot[RESULT] = result
316 root = nonlocal_root[0] = oldroot[NEXT]
317 oldkey = root[KEY]
318 root[KEY] = root[RESULT] = None
319 del cache[oldkey]
320 cache[key] = oldroot
321 else:
322 last = root[PREV]
323 link = [last, root, key, result]
324 last[NEXT] = root[PREV] = cache[key] = link
325 stats[MISSES] += 1
326 finally:
327 lock.release()
328 return result
330 def cache_info():
331 """Report cache statistics"""
332 lock.acquire()
333 try:
334 return _CacheInfo(stats[HITS], stats[MISSES], maxsize,
335 len(cache))
336 finally:
337 lock.release()
339 def cache_clear():
340 """Clear the cache and cache statistics"""
341 lock.acquire()
342 try:
343 cache.clear()
344 root = nonlocal_root[0]
345 root[:] = [root, root, None, None]
346 stats[:] = [0, 0]
347 finally:
348 lock.release()
350 wrapper.__wrapped__ = user_function
351 wrapper.cache_info = cache_info
352 wrapper.cache_clear = cache_clear
353 return functools.update_wrapper(wrapper, user_function)
355 return decorating_function
358# python 3.3
359try:
360 from shutil import which
361except ImportError:
362 def which(cmd, mode=os.F_OK | os.X_OK, path=None):
363 """Given a command, mode, and a PATH string, return the path which
364 conforms to the given mode on the PATH, or None if there is no such
365 file.
367 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
368 of os.environ.get("PATH"), or can be overridden with a custom search
369 path.
370 """
371 def _access_check(fn, mode):
372 return (os.path.exists(fn) and os.access(fn, mode) and
373 not os.path.isdir(fn))
375 if os.path.dirname(cmd):
376 if _access_check(cmd, mode):
377 return cmd
378 return None
380 if path is None:
381 path = os.environ.get("PATH", os.defpath)
382 if not path:
383 return None
384 path = path.split(os.pathsep)
386 if sys.platform == "win32":
387 if os.curdir not in path:
388 path.insert(0, os.curdir)
390 pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
391 if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
392 files = [cmd]
393 else:
394 files = [cmd + ext for ext in pathext]
395 else:
396 files = [cmd]
398 seen = set()
399 for dir in path:
400 normdir = os.path.normcase(dir)
401 if normdir not in seen:
402 seen.add(normdir)
403 for thefile in files:
404 name = os.path.join(dir, thefile)
405 if _access_check(name, mode):
406 return name
407 return None
410# python 3.3
411try:
412 from shutil import get_terminal_size
413except ImportError:
414 def get_terminal_size(fallback=(80, 24)):
415 try:
416 import fcntl
417 import struct
418 import termios
419 except ImportError:
420 return fallback
421 else:
422 try:
423 # This should work on Linux.
424 res = struct.unpack(
425 'hh', fcntl.ioctl(1, termios.TIOCGWINSZ, '1234'))
426 return (res[1], res[0])
427 except Exception:
428 return fallback
431# python 3.3
432try:
433 from subprocess import TimeoutExpired as SubprocessTimeoutExpired
434except ImportError:
435 class SubprocessTimeoutExpired(Exception):
436 pass
439# python 3.5
440try:
441 from contextlib import redirect_stderr
442except ImportError:
443 @contextlib.contextmanager
444 def redirect_stderr(new_target):
445 original = sys.stderr
446 try:
447 sys.stderr = new_target
448 yield new_target
449 finally:
450 sys.stderr = original