Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/psutil-5.9.9-py3.8-linux-x86_64.egg/psutil/_compat.py: 13%
298 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-02 06:13 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-02 06:13 +0000
1# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
5"""Module which provides compatibility with older Python versions.
6This is more future-compatible rather than the opposite (prefer latest
7Python 3 way of doing things).
8"""
10import collections
11import contextlib
12import errno
13import functools
14import os
15import sys
16import types
19# fmt: off
20__all__ = [
21 # constants
22 "PY3",
23 # builtins
24 "long", "range", "super", "unicode", "basestring",
25 # literals
26 "b",
27 # collections module
28 "lru_cache",
29 # shutil module
30 "which", "get_terminal_size",
31 # contextlib module
32 "redirect_stderr",
33 # python 3 exceptions
34 "FileNotFoundError", "PermissionError", "ProcessLookupError",
35 "InterruptedError", "ChildProcessError", "FileExistsError",
36]
37# fmt: on
40PY3 = sys.version_info[0] >= 3
41_SENTINEL = object()
43if PY3:
44 long = int
45 xrange = range
46 unicode = str
47 basestring = str
48 range = range
50 def b(s):
51 return s.encode("latin-1")
53else:
54 long = long
55 range = xrange
56 unicode = unicode
57 basestring = basestring
59 def b(s):
60 return s
63# --- builtins
66# Python 3 super().
67# Taken from "future" package.
68# Credit: Ryan Kelly
69if PY3:
70 super = super
71else:
72 _builtin_super = super
74 def super(type_=_SENTINEL, type_or_obj=_SENTINEL, framedepth=1):
75 """Like Python 3 builtin super(). If called without any arguments
76 it attempts to infer them at runtime.
77 """
78 if type_ is _SENTINEL:
79 f = sys._getframe(framedepth)
80 try:
81 # Get the function's first positional argument.
82 type_or_obj = f.f_locals[f.f_code.co_varnames[0]]
83 except (IndexError, KeyError):
84 msg = 'super() used in a function with no args'
85 raise RuntimeError(msg)
86 try:
87 # Get the MRO so we can crawl it.
88 mro = type_or_obj.__mro__
89 except (AttributeError, RuntimeError):
90 try:
91 mro = type_or_obj.__class__.__mro__
92 except AttributeError:
93 msg = 'super() used in a non-newstyle class'
94 raise RuntimeError(msg)
95 for type_ in mro:
96 # Find the class that owns the currently-executing method.
97 for meth in type_.__dict__.values():
98 # Drill down through any wrappers to the underlying func.
99 # This handles e.g. classmethod() and staticmethod().
100 try:
101 while not isinstance(meth, types.FunctionType):
102 if isinstance(meth, property):
103 # Calling __get__ on the property will invoke
104 # user code which might throw exceptions or
105 # have side effects
106 meth = meth.fget
107 else:
108 try:
109 meth = meth.__func__
110 except AttributeError:
111 meth = meth.__get__(type_or_obj, type_)
112 except (AttributeError, TypeError):
113 continue
114 if meth.func_code is f.f_code:
115 break # found
116 else:
117 # Not found. Move onto the next class in MRO.
118 continue
119 break # found
120 else:
121 msg = 'super() called outside a method'
122 raise RuntimeError(msg)
124 # Dispatch to builtin super().
125 if type_or_obj is not _SENTINEL:
126 return _builtin_super(type_, type_or_obj)
127 return _builtin_super(type_)
130# --- exceptions
133if PY3:
134 FileNotFoundError = FileNotFoundError # NOQA
135 PermissionError = PermissionError # NOQA
136 ProcessLookupError = ProcessLookupError # NOQA
137 InterruptedError = InterruptedError # NOQA
138 ChildProcessError = ChildProcessError # NOQA
139 FileExistsError = FileExistsError # NOQA
140else:
141 # https://github.com/PythonCharmers/python-future/blob/exceptions/
142 # src/future/types/exceptions/pep3151.py
143 import platform
145 def _instance_checking_exception(base_exception=Exception):
146 def wrapped(instance_checker):
147 class TemporaryClass(base_exception):
148 def __init__(self, *args, **kwargs):
149 if len(args) == 1 and isinstance(args[0], TemporaryClass):
150 unwrap_me = args[0]
151 for attr in dir(unwrap_me):
152 if not attr.startswith('__'):
153 setattr(self, attr, getattr(unwrap_me, attr))
154 else:
155 super(TemporaryClass, self).__init__( # noqa
156 *args, **kwargs
157 )
159 class __metaclass__(type):
160 def __instancecheck__(cls, inst):
161 return instance_checker(inst)
163 def __subclasscheck__(cls, classinfo):
164 value = sys.exc_info()[1]
165 return isinstance(value, cls)
167 TemporaryClass.__name__ = instance_checker.__name__
168 TemporaryClass.__doc__ = instance_checker.__doc__
169 return TemporaryClass
171 return wrapped
173 @_instance_checking_exception(EnvironmentError)
174 def FileNotFoundError(inst):
175 return getattr(inst, 'errno', _SENTINEL) == errno.ENOENT
177 @_instance_checking_exception(EnvironmentError)
178 def ProcessLookupError(inst):
179 return getattr(inst, 'errno', _SENTINEL) == errno.ESRCH
181 @_instance_checking_exception(EnvironmentError)
182 def PermissionError(inst):
183 return getattr(inst, 'errno', _SENTINEL) in (errno.EACCES, errno.EPERM)
185 @_instance_checking_exception(EnvironmentError)
186 def InterruptedError(inst):
187 return getattr(inst, 'errno', _SENTINEL) == errno.EINTR
189 @_instance_checking_exception(EnvironmentError)
190 def ChildProcessError(inst):
191 return getattr(inst, 'errno', _SENTINEL) == errno.ECHILD
193 @_instance_checking_exception(EnvironmentError)
194 def FileExistsError(inst):
195 return getattr(inst, 'errno', _SENTINEL) == errno.EEXIST
197 if platform.python_implementation() != "CPython":
198 try:
199 raise OSError(errno.EEXIST, "perm")
200 except FileExistsError:
201 pass
202 except OSError:
203 msg = (
204 "broken or incompatible Python implementation, see: "
205 "https://github.com/giampaolo/psutil/issues/1659"
206 )
207 raise RuntimeError(msg)
210# --- stdlib additions
213# py 3.2 functools.lru_cache
214# Taken from: http://code.activestate.com/recipes/578078
215# Credit: Raymond Hettinger
216try:
217 from functools import lru_cache
218except ImportError:
219 try:
220 from threading import RLock
221 except ImportError:
222 from dummy_threading import RLock
224 _CacheInfo = collections.namedtuple(
225 "CacheInfo", ["hits", "misses", "maxsize", "currsize"]
226 )
228 class _HashedSeq(list):
229 __slots__ = ('hashvalue',)
231 def __init__(self, tup, hash=hash):
232 self[:] = tup
233 self.hashvalue = hash(tup)
235 def __hash__(self):
236 return self.hashvalue
238 def _make_key(
239 args,
240 kwds,
241 typed,
242 kwd_mark=(_SENTINEL,),
243 fasttypes=set((int, str, frozenset, type(None))), # noqa
244 sorted=sorted,
245 tuple=tuple,
246 type=type,
247 len=len,
248 ):
249 key = args
250 if kwds:
251 sorted_items = sorted(kwds.items())
252 key += kwd_mark
253 for item in sorted_items:
254 key += item
255 if typed:
256 key += tuple(type(v) for v in args)
257 if kwds:
258 key += tuple(type(v) for k, v in sorted_items)
259 elif len(key) == 1 and type(key[0]) in fasttypes:
260 return key[0]
261 return _HashedSeq(key)
263 def lru_cache(maxsize=100, typed=False):
264 """Least-recently-used cache decorator, see:
265 http://docs.python.org/3/library/functools.html#functools.lru_cache.
266 """
268 def decorating_function(user_function):
269 cache = {}
270 stats = [0, 0]
271 HITS, MISSES = 0, 1
272 make_key = _make_key
273 cache_get = cache.get
274 _len = len
275 lock = RLock()
276 root = []
277 root[:] = [root, root, None, None]
278 nonlocal_root = [root]
279 PREV, NEXT, KEY, RESULT = 0, 1, 2, 3
280 if maxsize == 0:
282 def wrapper(*args, **kwds):
283 result = user_function(*args, **kwds)
284 stats[MISSES] += 1
285 return result
287 elif maxsize is None:
289 def wrapper(*args, **kwds):
290 key = make_key(args, kwds, typed)
291 result = cache_get(key, root)
292 if result is not root:
293 stats[HITS] += 1
294 return result
295 result = user_function(*args, **kwds)
296 cache[key] = result
297 stats[MISSES] += 1
298 return result
300 else:
302 def wrapper(*args, **kwds):
303 if kwds or typed:
304 key = make_key(args, kwds, typed)
305 else:
306 key = args
307 lock.acquire()
308 try:
309 link = cache_get(key)
310 if link is not None:
311 (root,) = nonlocal_root
312 link_prev, link_next, key, result = link
313 link_prev[NEXT] = link_next
314 link_next[PREV] = link_prev
315 last = root[PREV]
316 last[NEXT] = root[PREV] = link
317 link[PREV] = last
318 link[NEXT] = root
319 stats[HITS] += 1
320 return result
321 finally:
322 lock.release()
323 result = user_function(*args, **kwds)
324 lock.acquire()
325 try:
326 (root,) = nonlocal_root
327 if key in cache:
328 pass
329 elif _len(cache) >= maxsize:
330 oldroot = root
331 oldroot[KEY] = key
332 oldroot[RESULT] = result
333 root = nonlocal_root[0] = oldroot[NEXT]
334 oldkey = root[KEY]
335 root[KEY] = root[RESULT] = None
336 del cache[oldkey]
337 cache[key] = oldroot
338 else:
339 last = root[PREV]
340 link = [last, root, key, result]
341 last[NEXT] = root[PREV] = cache[key] = link
342 stats[MISSES] += 1
343 finally:
344 lock.release()
345 return result
347 def cache_info():
348 """Report cache statistics."""
349 lock.acquire()
350 try:
351 return _CacheInfo(
352 stats[HITS], stats[MISSES], maxsize, len(cache)
353 )
354 finally:
355 lock.release()
357 def cache_clear():
358 """Clear the cache and cache statistics."""
359 lock.acquire()
360 try:
361 cache.clear()
362 root = nonlocal_root[0]
363 root[:] = [root, root, None, None]
364 stats[:] = [0, 0]
365 finally:
366 lock.release()
368 wrapper.__wrapped__ = user_function
369 wrapper.cache_info = cache_info
370 wrapper.cache_clear = cache_clear
371 return functools.update_wrapper(wrapper, user_function)
373 return decorating_function
376# python 3.3
377try:
378 from shutil import which
379except ImportError:
381 def which(cmd, mode=os.F_OK | os.X_OK, path=None):
382 """Given a command, mode, and a PATH string, return the path which
383 conforms to the given mode on the PATH, or None if there is no such
384 file.
386 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
387 of os.environ.get("PATH"), or can be overridden with a custom search
388 path.
389 """
391 def _access_check(fn, mode):
392 return (
393 os.path.exists(fn)
394 and os.access(fn, mode)
395 and not os.path.isdir(fn)
396 )
398 if os.path.dirname(cmd):
399 if _access_check(cmd, mode):
400 return cmd
401 return None
403 if path is None:
404 path = os.environ.get("PATH", os.defpath)
405 if not path:
406 return None
407 path = path.split(os.pathsep)
409 if sys.platform == "win32":
410 if os.curdir not in path:
411 path.insert(0, os.curdir)
413 pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
414 if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
415 files = [cmd]
416 else:
417 files = [cmd + ext for ext in pathext]
418 else:
419 files = [cmd]
421 seen = set()
422 for dir in path:
423 normdir = os.path.normcase(dir)
424 if normdir not in seen:
425 seen.add(normdir)
426 for thefile in files:
427 name = os.path.join(dir, thefile)
428 if _access_check(name, mode):
429 return name
430 return None
433# python 3.3
434try:
435 from shutil import get_terminal_size
436except ImportError:
438 def get_terminal_size(fallback=(80, 24)):
439 try:
440 import fcntl
441 import struct
442 import termios
443 except ImportError:
444 return fallback
445 else:
446 try:
447 # This should work on Linux.
448 res = struct.unpack(
449 'hh', fcntl.ioctl(1, termios.TIOCGWINSZ, '1234')
450 )
451 return (res[1], res[0])
452 except Exception: # noqa: BLE001
453 return fallback
456# python 3.3
457try:
458 from subprocess import TimeoutExpired as SubprocessTimeoutExpired
459except ImportError:
461 class SubprocessTimeoutExpired(Exception):
462 pass
465# python 3.5
466try:
467 from contextlib import redirect_stderr
468except ImportError:
470 @contextlib.contextmanager
471 def redirect_stderr(new_target):
472 original = sys.stderr
473 try:
474 sys.stderr = new_target
475 yield new_target
476 finally:
477 sys.stderr = original