Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/psutil-5.9.9-py3.8-linux-x86_64.egg/psutil/_compat.py: 13%

298 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-02 06:13 +0000

1# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. 

2# Use of this source code is governed by a BSD-style license that can be 

3# found in the LICENSE file. 

4 

5"""Module which provides compatibility with older Python versions. 

6This is more future-compatible rather than the opposite (prefer latest 

7Python 3 way of doing things). 

8""" 

9 

10import collections 

11import contextlib 

12import errno 

13import functools 

14import os 

15import sys 

16import types 

17 

18 

19# fmt: off 

20__all__ = [ 

21 # constants 

22 "PY3", 

23 # builtins 

24 "long", "range", "super", "unicode", "basestring", 

25 # literals 

26 "b", 

27 # collections module 

28 "lru_cache", 

29 # shutil module 

30 "which", "get_terminal_size", 

31 # contextlib module 

32 "redirect_stderr", 

33 # python 3 exceptions 

34 "FileNotFoundError", "PermissionError", "ProcessLookupError", 

35 "InterruptedError", "ChildProcessError", "FileExistsError", 

36] 

37# fmt: on 

38 

39 

40PY3 = sys.version_info[0] >= 3 

41_SENTINEL = object() 

42 

43if PY3: 

44 long = int 

45 xrange = range 

46 unicode = str 

47 basestring = str 

48 range = range 

49 

50 def b(s): 

51 return s.encode("latin-1") 

52 

53else: 

54 long = long 

55 range = xrange 

56 unicode = unicode 

57 basestring = basestring 

58 

59 def b(s): 

60 return s 

61 

62 

63# --- builtins 

64 

65 

66# Python 3 super(). 

67# Taken from "future" package. 

68# Credit: Ryan Kelly 

69if PY3: 

70 super = super 

71else: 

72 _builtin_super = super 

73 

74 def super(type_=_SENTINEL, type_or_obj=_SENTINEL, framedepth=1): 

75 """Like Python 3 builtin super(). If called without any arguments 

76 it attempts to infer them at runtime. 

77 """ 

78 if type_ is _SENTINEL: 

79 f = sys._getframe(framedepth) 

80 try: 

81 # Get the function's first positional argument. 

82 type_or_obj = f.f_locals[f.f_code.co_varnames[0]] 

83 except (IndexError, KeyError): 

84 msg = 'super() used in a function with no args' 

85 raise RuntimeError(msg) 

86 try: 

87 # Get the MRO so we can crawl it. 

88 mro = type_or_obj.__mro__ 

89 except (AttributeError, RuntimeError): 

90 try: 

91 mro = type_or_obj.__class__.__mro__ 

92 except AttributeError: 

93 msg = 'super() used in a non-newstyle class' 

94 raise RuntimeError(msg) 

95 for type_ in mro: 

96 # Find the class that owns the currently-executing method. 

97 for meth in type_.__dict__.values(): 

98 # Drill down through any wrappers to the underlying func. 

99 # This handles e.g. classmethod() and staticmethod(). 

100 try: 

101 while not isinstance(meth, types.FunctionType): 

102 if isinstance(meth, property): 

103 # Calling __get__ on the property will invoke 

104 # user code which might throw exceptions or 

105 # have side effects 

106 meth = meth.fget 

107 else: 

108 try: 

109 meth = meth.__func__ 

110 except AttributeError: 

111 meth = meth.__get__(type_or_obj, type_) 

112 except (AttributeError, TypeError): 

113 continue 

114 if meth.func_code is f.f_code: 

115 break # found 

116 else: 

117 # Not found. Move onto the next class in MRO. 

118 continue 

119 break # found 

120 else: 

121 msg = 'super() called outside a method' 

122 raise RuntimeError(msg) 

123 

124 # Dispatch to builtin super(). 

125 if type_or_obj is not _SENTINEL: 

126 return _builtin_super(type_, type_or_obj) 

127 return _builtin_super(type_) 

128 

129 

130# --- exceptions 

131 

132 

133if PY3: 

134 FileNotFoundError = FileNotFoundError # NOQA 

135 PermissionError = PermissionError # NOQA 

136 ProcessLookupError = ProcessLookupError # NOQA 

137 InterruptedError = InterruptedError # NOQA 

138 ChildProcessError = ChildProcessError # NOQA 

139 FileExistsError = FileExistsError # NOQA 

140else: 

141 # https://github.com/PythonCharmers/python-future/blob/exceptions/ 

142 # src/future/types/exceptions/pep3151.py 

143 import platform 

144 

145 def _instance_checking_exception(base_exception=Exception): 

146 def wrapped(instance_checker): 

147 class TemporaryClass(base_exception): 

148 def __init__(self, *args, **kwargs): 

149 if len(args) == 1 and isinstance(args[0], TemporaryClass): 

150 unwrap_me = args[0] 

151 for attr in dir(unwrap_me): 

152 if not attr.startswith('__'): 

153 setattr(self, attr, getattr(unwrap_me, attr)) 

154 else: 

155 super(TemporaryClass, self).__init__( # noqa 

156 *args, **kwargs 

157 ) 

158 

159 class __metaclass__(type): 

160 def __instancecheck__(cls, inst): 

161 return instance_checker(inst) 

162 

163 def __subclasscheck__(cls, classinfo): 

164 value = sys.exc_info()[1] 

165 return isinstance(value, cls) 

166 

167 TemporaryClass.__name__ = instance_checker.__name__ 

168 TemporaryClass.__doc__ = instance_checker.__doc__ 

169 return TemporaryClass 

170 

171 return wrapped 

172 

173 @_instance_checking_exception(EnvironmentError) 

174 def FileNotFoundError(inst): 

175 return getattr(inst, 'errno', _SENTINEL) == errno.ENOENT 

176 

177 @_instance_checking_exception(EnvironmentError) 

178 def ProcessLookupError(inst): 

179 return getattr(inst, 'errno', _SENTINEL) == errno.ESRCH 

180 

181 @_instance_checking_exception(EnvironmentError) 

182 def PermissionError(inst): 

183 return getattr(inst, 'errno', _SENTINEL) in (errno.EACCES, errno.EPERM) 

184 

185 @_instance_checking_exception(EnvironmentError) 

186 def InterruptedError(inst): 

187 return getattr(inst, 'errno', _SENTINEL) == errno.EINTR 

188 

189 @_instance_checking_exception(EnvironmentError) 

190 def ChildProcessError(inst): 

191 return getattr(inst, 'errno', _SENTINEL) == errno.ECHILD 

192 

193 @_instance_checking_exception(EnvironmentError) 

194 def FileExistsError(inst): 

195 return getattr(inst, 'errno', _SENTINEL) == errno.EEXIST 

196 

197 if platform.python_implementation() != "CPython": 

198 try: 

199 raise OSError(errno.EEXIST, "perm") 

200 except FileExistsError: 

201 pass 

202 except OSError: 

203 msg = ( 

204 "broken or incompatible Python implementation, see: " 

205 "https://github.com/giampaolo/psutil/issues/1659" 

206 ) 

207 raise RuntimeError(msg) 

208 

209 

210# --- stdlib additions 

211 

212 

213# py 3.2 functools.lru_cache 

214# Taken from: http://code.activestate.com/recipes/578078 

215# Credit: Raymond Hettinger 

216try: 

217 from functools import lru_cache 

218except ImportError: 

219 try: 

220 from threading import RLock 

221 except ImportError: 

222 from dummy_threading import RLock 

223 

224 _CacheInfo = collections.namedtuple( 

225 "CacheInfo", ["hits", "misses", "maxsize", "currsize"] 

226 ) 

227 

228 class _HashedSeq(list): 

229 __slots__ = ('hashvalue',) 

230 

231 def __init__(self, tup, hash=hash): 

232 self[:] = tup 

233 self.hashvalue = hash(tup) 

234 

235 def __hash__(self): 

236 return self.hashvalue 

237 

238 def _make_key( 

239 args, 

240 kwds, 

241 typed, 

242 kwd_mark=(_SENTINEL,), 

243 fasttypes=set((int, str, frozenset, type(None))), # noqa 

244 sorted=sorted, 

245 tuple=tuple, 

246 type=type, 

247 len=len, 

248 ): 

249 key = args 

250 if kwds: 

251 sorted_items = sorted(kwds.items()) 

252 key += kwd_mark 

253 for item in sorted_items: 

254 key += item 

255 if typed: 

256 key += tuple(type(v) for v in args) 

257 if kwds: 

258 key += tuple(type(v) for k, v in sorted_items) 

259 elif len(key) == 1 and type(key[0]) in fasttypes: 

260 return key[0] 

261 return _HashedSeq(key) 

262 

263 def lru_cache(maxsize=100, typed=False): 

264 """Least-recently-used cache decorator, see: 

265 http://docs.python.org/3/library/functools.html#functools.lru_cache. 

266 """ 

267 

268 def decorating_function(user_function): 

269 cache = {} 

270 stats = [0, 0] 

271 HITS, MISSES = 0, 1 

272 make_key = _make_key 

273 cache_get = cache.get 

274 _len = len 

275 lock = RLock() 

276 root = [] 

277 root[:] = [root, root, None, None] 

278 nonlocal_root = [root] 

279 PREV, NEXT, KEY, RESULT = 0, 1, 2, 3 

280 if maxsize == 0: 

281 

282 def wrapper(*args, **kwds): 

283 result = user_function(*args, **kwds) 

284 stats[MISSES] += 1 

285 return result 

286 

287 elif maxsize is None: 

288 

289 def wrapper(*args, **kwds): 

290 key = make_key(args, kwds, typed) 

291 result = cache_get(key, root) 

292 if result is not root: 

293 stats[HITS] += 1 

294 return result 

295 result = user_function(*args, **kwds) 

296 cache[key] = result 

297 stats[MISSES] += 1 

298 return result 

299 

300 else: 

301 

302 def wrapper(*args, **kwds): 

303 if kwds or typed: 

304 key = make_key(args, kwds, typed) 

305 else: 

306 key = args 

307 lock.acquire() 

308 try: 

309 link = cache_get(key) 

310 if link is not None: 

311 (root,) = nonlocal_root 

312 link_prev, link_next, key, result = link 

313 link_prev[NEXT] = link_next 

314 link_next[PREV] = link_prev 

315 last = root[PREV] 

316 last[NEXT] = root[PREV] = link 

317 link[PREV] = last 

318 link[NEXT] = root 

319 stats[HITS] += 1 

320 return result 

321 finally: 

322 lock.release() 

323 result = user_function(*args, **kwds) 

324 lock.acquire() 

325 try: 

326 (root,) = nonlocal_root 

327 if key in cache: 

328 pass 

329 elif _len(cache) >= maxsize: 

330 oldroot = root 

331 oldroot[KEY] = key 

332 oldroot[RESULT] = result 

333 root = nonlocal_root[0] = oldroot[NEXT] 

334 oldkey = root[KEY] 

335 root[KEY] = root[RESULT] = None 

336 del cache[oldkey] 

337 cache[key] = oldroot 

338 else: 

339 last = root[PREV] 

340 link = [last, root, key, result] 

341 last[NEXT] = root[PREV] = cache[key] = link 

342 stats[MISSES] += 1 

343 finally: 

344 lock.release() 

345 return result 

346 

347 def cache_info(): 

348 """Report cache statistics.""" 

349 lock.acquire() 

350 try: 

351 return _CacheInfo( 

352 stats[HITS], stats[MISSES], maxsize, len(cache) 

353 ) 

354 finally: 

355 lock.release() 

356 

357 def cache_clear(): 

358 """Clear the cache and cache statistics.""" 

359 lock.acquire() 

360 try: 

361 cache.clear() 

362 root = nonlocal_root[0] 

363 root[:] = [root, root, None, None] 

364 stats[:] = [0, 0] 

365 finally: 

366 lock.release() 

367 

368 wrapper.__wrapped__ = user_function 

369 wrapper.cache_info = cache_info 

370 wrapper.cache_clear = cache_clear 

371 return functools.update_wrapper(wrapper, user_function) 

372 

373 return decorating_function 

374 

375 

376# python 3.3 

377try: 

378 from shutil import which 

379except ImportError: 

380 

381 def which(cmd, mode=os.F_OK | os.X_OK, path=None): 

382 """Given a command, mode, and a PATH string, return the path which 

383 conforms to the given mode on the PATH, or None if there is no such 

384 file. 

385 

386 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result 

387 of os.environ.get("PATH"), or can be overridden with a custom search 

388 path. 

389 """ 

390 

391 def _access_check(fn, mode): 

392 return ( 

393 os.path.exists(fn) 

394 and os.access(fn, mode) 

395 and not os.path.isdir(fn) 

396 ) 

397 

398 if os.path.dirname(cmd): 

399 if _access_check(cmd, mode): 

400 return cmd 

401 return None 

402 

403 if path is None: 

404 path = os.environ.get("PATH", os.defpath) 

405 if not path: 

406 return None 

407 path = path.split(os.pathsep) 

408 

409 if sys.platform == "win32": 

410 if os.curdir not in path: 

411 path.insert(0, os.curdir) 

412 

413 pathext = os.environ.get("PATHEXT", "").split(os.pathsep) 

414 if any(cmd.lower().endswith(ext.lower()) for ext in pathext): 

415 files = [cmd] 

416 else: 

417 files = [cmd + ext for ext in pathext] 

418 else: 

419 files = [cmd] 

420 

421 seen = set() 

422 for dir in path: 

423 normdir = os.path.normcase(dir) 

424 if normdir not in seen: 

425 seen.add(normdir) 

426 for thefile in files: 

427 name = os.path.join(dir, thefile) 

428 if _access_check(name, mode): 

429 return name 

430 return None 

431 

432 

433# python 3.3 

434try: 

435 from shutil import get_terminal_size 

436except ImportError: 

437 

438 def get_terminal_size(fallback=(80, 24)): 

439 try: 

440 import fcntl 

441 import struct 

442 import termios 

443 except ImportError: 

444 return fallback 

445 else: 

446 try: 

447 # This should work on Linux. 

448 res = struct.unpack( 

449 'hh', fcntl.ioctl(1, termios.TIOCGWINSZ, '1234') 

450 ) 

451 return (res[1], res[0]) 

452 except Exception: # noqa: BLE001 

453 return fallback 

454 

455 

456# python 3.3 

457try: 

458 from subprocess import TimeoutExpired as SubprocessTimeoutExpired 

459except ImportError: 

460 

461 class SubprocessTimeoutExpired(Exception): 

462 pass 

463 

464 

465# python 3.5 

466try: 

467 from contextlib import redirect_stderr 

468except ImportError: 

469 

470 @contextlib.contextmanager 

471 def redirect_stderr(new_target): 

472 original = sys.stderr 

473 try: 

474 sys.stderr = new_target 

475 yield new_target 

476 finally: 

477 sys.stderr = original