Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/psutil-5.9.6-py3.8-linux-x86_64.egg/psutil/_compat.py: 13%

298 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 07:00 +0000

1# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. 

2# Use of this source code is governed by a BSD-style license that can be 

3# found in the LICENSE file. 

4 

5"""Module which provides compatibility with older Python versions. 

6This is more future-compatible rather than the opposite (prefer latest 

7Python 3 way of doing things). 

8""" 

9 

10import collections 

11import contextlib 

12import errno 

13import functools 

14import os 

15import sys 

16import types 

17 

18 

19__all__ = [ 

20 # constants 

21 "PY3", 

22 # builtins 

23 "long", "range", "super", "unicode", "basestring", 

24 # literals 

25 "u", "b", 

26 # collections module 

27 "lru_cache", 

28 # shutil module 

29 "which", "get_terminal_size", 

30 # contextlib module 

31 "redirect_stderr", 

32 # python 3 exceptions 

33 "FileNotFoundError", "PermissionError", "ProcessLookupError", 

34 "InterruptedError", "ChildProcessError", "FileExistsError"] 

35 

36 

37PY3 = sys.version_info[0] == 3 

38_SENTINEL = object() 

39 

40if PY3: 

41 long = int 

42 xrange = range 

43 unicode = str 

44 basestring = str 

45 range = range 

46 

47 def u(s): 

48 return s 

49 

50 def b(s): 

51 return s.encode("latin-1") 

52else: 

53 long = long 

54 range = xrange 

55 unicode = unicode 

56 basestring = basestring 

57 

58 def u(s): 

59 return unicode(s, "unicode_escape") 

60 

61 def b(s): 

62 return s 

63 

64 

65# --- builtins 

66 

67 

68# Python 3 super(). 

69# Taken from "future" package. 

70# Credit: Ryan Kelly 

71if PY3: 

72 super = super 

73else: 

74 _builtin_super = super 

75 

76 def super(type_=_SENTINEL, type_or_obj=_SENTINEL, framedepth=1): 

77 """Like Python 3 builtin super(). If called without any arguments 

78 it attempts to infer them at runtime. 

79 """ 

80 if type_ is _SENTINEL: 

81 f = sys._getframe(framedepth) 

82 try: 

83 # Get the function's first positional argument. 

84 type_or_obj = f.f_locals[f.f_code.co_varnames[0]] 

85 except (IndexError, KeyError): 

86 raise RuntimeError('super() used in a function with no args') 

87 try: 

88 # Get the MRO so we can crawl it. 

89 mro = type_or_obj.__mro__ 

90 except (AttributeError, RuntimeError): 

91 try: 

92 mro = type_or_obj.__class__.__mro__ 

93 except AttributeError: 

94 raise RuntimeError('super() used in a non-newstyle class') 

95 for type_ in mro: 

96 # Find the class that owns the currently-executing method. 

97 for meth in type_.__dict__.values(): 

98 # Drill down through any wrappers to the underlying func. 

99 # This handles e.g. classmethod() and staticmethod(). 

100 try: 

101 while not isinstance(meth, types.FunctionType): 

102 if isinstance(meth, property): 

103 # Calling __get__ on the property will invoke 

104 # user code which might throw exceptions or 

105 # have side effects 

106 meth = meth.fget 

107 else: 

108 try: 

109 meth = meth.__func__ 

110 except AttributeError: 

111 meth = meth.__get__(type_or_obj, type_) 

112 except (AttributeError, TypeError): 

113 continue 

114 if meth.func_code is f.f_code: 

115 break # found 

116 else: 

117 # Not found. Move onto the next class in MRO. 

118 continue 

119 break # found 

120 else: 

121 raise RuntimeError('super() called outside a method') 

122 

123 # Dispatch to builtin super(). 

124 if type_or_obj is not _SENTINEL: 

125 return _builtin_super(type_, type_or_obj) 

126 return _builtin_super(type_) 

127 

128 

129# --- exceptions 

130 

131 

132if PY3: 

133 FileNotFoundError = FileNotFoundError # NOQA 

134 PermissionError = PermissionError # NOQA 

135 ProcessLookupError = ProcessLookupError # NOQA 

136 InterruptedError = InterruptedError # NOQA 

137 ChildProcessError = ChildProcessError # NOQA 

138 FileExistsError = FileExistsError # NOQA 

139else: 

140 # https://github.com/PythonCharmers/python-future/blob/exceptions/ 

141 # src/future/types/exceptions/pep3151.py 

142 import platform 

143 

144 def _instance_checking_exception(base_exception=Exception): 

145 def wrapped(instance_checker): 

146 class TemporaryClass(base_exception): 

147 

148 def __init__(self, *args, **kwargs): 

149 if len(args) == 1 and isinstance(args[0], TemporaryClass): 

150 unwrap_me = args[0] 

151 for attr in dir(unwrap_me): 

152 if not attr.startswith('__'): 

153 setattr(self, attr, getattr(unwrap_me, attr)) 

154 else: 

155 super(TemporaryClass, self).__init__(*args, **kwargs) 

156 

157 class __metaclass__(type): 

158 def __instancecheck__(cls, inst): 

159 return instance_checker(inst) 

160 

161 def __subclasscheck__(cls, classinfo): 

162 value = sys.exc_info()[1] 

163 return isinstance(value, cls) 

164 

165 TemporaryClass.__name__ = instance_checker.__name__ 

166 TemporaryClass.__doc__ = instance_checker.__doc__ 

167 return TemporaryClass 

168 

169 return wrapped 

170 

171 @_instance_checking_exception(EnvironmentError) 

172 def FileNotFoundError(inst): 

173 return getattr(inst, 'errno', _SENTINEL) == errno.ENOENT 

174 

175 @_instance_checking_exception(EnvironmentError) 

176 def ProcessLookupError(inst): 

177 return getattr(inst, 'errno', _SENTINEL) == errno.ESRCH 

178 

179 @_instance_checking_exception(EnvironmentError) 

180 def PermissionError(inst): 

181 return getattr(inst, 'errno', _SENTINEL) in ( 

182 errno.EACCES, errno.EPERM) 

183 

184 @_instance_checking_exception(EnvironmentError) 

185 def InterruptedError(inst): 

186 return getattr(inst, 'errno', _SENTINEL) == errno.EINTR 

187 

188 @_instance_checking_exception(EnvironmentError) 

189 def ChildProcessError(inst): 

190 return getattr(inst, 'errno', _SENTINEL) == errno.ECHILD 

191 

192 @_instance_checking_exception(EnvironmentError) 

193 def FileExistsError(inst): 

194 return getattr(inst, 'errno', _SENTINEL) == errno.EEXIST 

195 

196 if platform.python_implementation() != "CPython": 

197 try: 

198 raise OSError(errno.EEXIST, "perm") 

199 except FileExistsError: 

200 pass 

201 except OSError: 

202 raise RuntimeError( 

203 "broken or incompatible Python implementation, see: " 

204 "https://github.com/giampaolo/psutil/issues/1659") 

205 

206 

207# --- stdlib additions 

208 

209 

210# py 3.2 functools.lru_cache 

211# Taken from: http://code.activestate.com/recipes/578078 

212# Credit: Raymond Hettinger 

213try: 

214 from functools import lru_cache 

215except ImportError: 

216 try: 

217 from threading import RLock 

218 except ImportError: 

219 from dummy_threading import RLock 

220 

221 _CacheInfo = collections.namedtuple( 

222 "CacheInfo", ["hits", "misses", "maxsize", "currsize"]) 

223 

224 class _HashedSeq(list): 

225 __slots__ = 'hashvalue' 

226 

227 def __init__(self, tup, hash=hash): 

228 self[:] = tup 

229 self.hashvalue = hash(tup) 

230 

231 def __hash__(self): 

232 return self.hashvalue 

233 

234 def _make_key(args, kwds, typed, 

235 kwd_mark=(_SENTINEL, ), 

236 fasttypes=set((int, str, frozenset, type(None))), # noqa 

237 sorted=sorted, tuple=tuple, type=type, len=len): 

238 key = args 

239 if kwds: 

240 sorted_items = sorted(kwds.items()) 

241 key += kwd_mark 

242 for item in sorted_items: 

243 key += item 

244 if typed: 

245 key += tuple(type(v) for v in args) 

246 if kwds: 

247 key += tuple(type(v) for k, v in sorted_items) 

248 elif len(key) == 1 and type(key[0]) in fasttypes: 

249 return key[0] 

250 return _HashedSeq(key) 

251 

252 def lru_cache(maxsize=100, typed=False): 

253 """Least-recently-used cache decorator, see: 

254 http://docs.python.org/3/library/functools.html#functools.lru_cache 

255 """ 

256 def decorating_function(user_function): 

257 cache = {} 

258 stats = [0, 0] 

259 HITS, MISSES = 0, 1 

260 make_key = _make_key 

261 cache_get = cache.get 

262 _len = len 

263 lock = RLock() 

264 root = [] 

265 root[:] = [root, root, None, None] 

266 nonlocal_root = [root] 

267 PREV, NEXT, KEY, RESULT = 0, 1, 2, 3 

268 if maxsize == 0: 

269 def wrapper(*args, **kwds): 

270 result = user_function(*args, **kwds) 

271 stats[MISSES] += 1 

272 return result 

273 elif maxsize is None: 

274 def wrapper(*args, **kwds): 

275 key = make_key(args, kwds, typed) 

276 result = cache_get(key, root) 

277 if result is not root: 

278 stats[HITS] += 1 

279 return result 

280 result = user_function(*args, **kwds) 

281 cache[key] = result 

282 stats[MISSES] += 1 

283 return result 

284 else: 

285 def wrapper(*args, **kwds): 

286 if kwds or typed: 

287 key = make_key(args, kwds, typed) 

288 else: 

289 key = args 

290 lock.acquire() 

291 try: 

292 link = cache_get(key) 

293 if link is not None: 

294 root, = nonlocal_root 

295 link_prev, link_next, key, result = link 

296 link_prev[NEXT] = link_next 

297 link_next[PREV] = link_prev 

298 last = root[PREV] 

299 last[NEXT] = root[PREV] = link 

300 link[PREV] = last 

301 link[NEXT] = root 

302 stats[HITS] += 1 

303 return result 

304 finally: 

305 lock.release() 

306 result = user_function(*args, **kwds) 

307 lock.acquire() 

308 try: 

309 root, = nonlocal_root 

310 if key in cache: 

311 pass 

312 elif _len(cache) >= maxsize: 

313 oldroot = root 

314 oldroot[KEY] = key 

315 oldroot[RESULT] = result 

316 root = nonlocal_root[0] = oldroot[NEXT] 

317 oldkey = root[KEY] 

318 root[KEY] = root[RESULT] = None 

319 del cache[oldkey] 

320 cache[key] = oldroot 

321 else: 

322 last = root[PREV] 

323 link = [last, root, key, result] 

324 last[NEXT] = root[PREV] = cache[key] = link 

325 stats[MISSES] += 1 

326 finally: 

327 lock.release() 

328 return result 

329 

330 def cache_info(): 

331 """Report cache statistics""" 

332 lock.acquire() 

333 try: 

334 return _CacheInfo(stats[HITS], stats[MISSES], maxsize, 

335 len(cache)) 

336 finally: 

337 lock.release() 

338 

339 def cache_clear(): 

340 """Clear the cache and cache statistics""" 

341 lock.acquire() 

342 try: 

343 cache.clear() 

344 root = nonlocal_root[0] 

345 root[:] = [root, root, None, None] 

346 stats[:] = [0, 0] 

347 finally: 

348 lock.release() 

349 

350 wrapper.__wrapped__ = user_function 

351 wrapper.cache_info = cache_info 

352 wrapper.cache_clear = cache_clear 

353 return functools.update_wrapper(wrapper, user_function) 

354 

355 return decorating_function 

356 

357 

358# python 3.3 

359try: 

360 from shutil import which 

361except ImportError: 

362 def which(cmd, mode=os.F_OK | os.X_OK, path=None): 

363 """Given a command, mode, and a PATH string, return the path which 

364 conforms to the given mode on the PATH, or None if there is no such 

365 file. 

366 

367 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result 

368 of os.environ.get("PATH"), or can be overridden with a custom search 

369 path. 

370 """ 

371 def _access_check(fn, mode): 

372 return (os.path.exists(fn) and os.access(fn, mode) and 

373 not os.path.isdir(fn)) 

374 

375 if os.path.dirname(cmd): 

376 if _access_check(cmd, mode): 

377 return cmd 

378 return None 

379 

380 if path is None: 

381 path = os.environ.get("PATH", os.defpath) 

382 if not path: 

383 return None 

384 path = path.split(os.pathsep) 

385 

386 if sys.platform == "win32": 

387 if os.curdir not in path: 

388 path.insert(0, os.curdir) 

389 

390 pathext = os.environ.get("PATHEXT", "").split(os.pathsep) 

391 if any(cmd.lower().endswith(ext.lower()) for ext in pathext): 

392 files = [cmd] 

393 else: 

394 files = [cmd + ext for ext in pathext] 

395 else: 

396 files = [cmd] 

397 

398 seen = set() 

399 for dir in path: 

400 normdir = os.path.normcase(dir) 

401 if normdir not in seen: 

402 seen.add(normdir) 

403 for thefile in files: 

404 name = os.path.join(dir, thefile) 

405 if _access_check(name, mode): 

406 return name 

407 return None 

408 

409 

410# python 3.3 

411try: 

412 from shutil import get_terminal_size 

413except ImportError: 

414 def get_terminal_size(fallback=(80, 24)): 

415 try: 

416 import fcntl 

417 import struct 

418 import termios 

419 except ImportError: 

420 return fallback 

421 else: 

422 try: 

423 # This should work on Linux. 

424 res = struct.unpack( 

425 'hh', fcntl.ioctl(1, termios.TIOCGWINSZ, '1234')) 

426 return (res[1], res[0]) 

427 except Exception: 

428 return fallback 

429 

430 

431# python 3.3 

432try: 

433 from subprocess import TimeoutExpired as SubprocessTimeoutExpired 

434except ImportError: 

435 class SubprocessTimeoutExpired(Exception): 

436 pass 

437 

438 

439# python 3.5 

440try: 

441 from contextlib import redirect_stderr 

442except ImportError: 

443 @contextlib.contextmanager 

444 def redirect_stderr(new_target): 

445 original = sys.stderr 

446 try: 

447 sys.stderr = new_target 

448 yield new_target 

449 finally: 

450 sys.stderr = original