Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/psutil/_compat.py: 13%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

302 statements  

1# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. 

2# Use of this source code is governed by a BSD-style license that can be 

3# found in the LICENSE file. 

4 

5"""Module which provides compatibility with older Python versions. 

6This is more future-compatible rather than the opposite (prefer latest 

7Python 3 way of doing things). 

8""" 

9 

10import collections 

11import contextlib 

12import errno 

13import functools 

14import os 

15import sys 

16import types 

17 

18 

19# fmt: off 

20__all__ = [ 

21 # constants 

22 "PY3", 

23 # builtins 

24 "long", "range", "super", "unicode", "basestring", 

25 # literals 

26 "u", "b", 

27 # collections module 

28 "lru_cache", 

29 # shutil module 

30 "which", "get_terminal_size", 

31 # contextlib module 

32 "redirect_stderr", 

33 # python 3 exceptions 

34 "FileNotFoundError", "PermissionError", "ProcessLookupError", 

35 "InterruptedError", "ChildProcessError", "FileExistsError", 

36] 

37# fmt: on 

38 

39 

40PY3 = sys.version_info[0] >= 3 

41_SENTINEL = object() 

42 

43if PY3: 

44 long = int 

45 xrange = range 

46 unicode = str 

47 basestring = str 

48 range = range 

49 

50 def u(s): 

51 return s 

52 

53 def b(s): 

54 return s.encode("latin-1") 

55 

56else: 

57 long = long 

58 range = xrange 

59 unicode = unicode 

60 basestring = basestring 

61 

62 def u(s): 

63 return unicode(s, "unicode_escape") 

64 

65 def b(s): 

66 return s 

67 

68 

69# --- builtins 

70 

71 

72# Python 3 super(). 

73# Taken from "future" package. 

74# Credit: Ryan Kelly 

75if PY3: 

76 super = super 

77else: 

78 _builtin_super = super 

79 

80 def super(type_=_SENTINEL, type_or_obj=_SENTINEL, framedepth=1): 

81 """Like Python 3 builtin super(). If called without any arguments 

82 it attempts to infer them at runtime. 

83 """ 

84 if type_ is _SENTINEL: 

85 f = sys._getframe(framedepth) 

86 try: 

87 # Get the function's first positional argument. 

88 type_or_obj = f.f_locals[f.f_code.co_varnames[0]] 

89 except (IndexError, KeyError): 

90 msg = 'super() used in a function with no args' 

91 raise RuntimeError(msg) 

92 try: 

93 # Get the MRO so we can crawl it. 

94 mro = type_or_obj.__mro__ 

95 except (AttributeError, RuntimeError): 

96 try: 

97 mro = type_or_obj.__class__.__mro__ 

98 except AttributeError: 

99 msg = 'super() used in a non-newstyle class' 

100 raise RuntimeError(msg) 

101 for type_ in mro: 

102 # Find the class that owns the currently-executing method. 

103 for meth in type_.__dict__.values(): 

104 # Drill down through any wrappers to the underlying func. 

105 # This handles e.g. classmethod() and staticmethod(). 

106 try: 

107 while not isinstance(meth, types.FunctionType): 

108 if isinstance(meth, property): 

109 # Calling __get__ on the property will invoke 

110 # user code which might throw exceptions or 

111 # have side effects 

112 meth = meth.fget 

113 else: 

114 try: 

115 meth = meth.__func__ 

116 except AttributeError: 

117 meth = meth.__get__(type_or_obj, type_) 

118 except (AttributeError, TypeError): 

119 continue 

120 if meth.func_code is f.f_code: 

121 break # found 

122 else: 

123 # Not found. Move onto the next class in MRO. 

124 continue 

125 break # found 

126 else: 

127 msg = 'super() called outside a method' 

128 raise RuntimeError(msg) 

129 

130 # Dispatch to builtin super(). 

131 if type_or_obj is not _SENTINEL: 

132 return _builtin_super(type_, type_or_obj) 

133 return _builtin_super(type_) 

134 

135 

136# --- exceptions 

137 

138 

139if PY3: 

140 FileNotFoundError = FileNotFoundError # NOQA 

141 PermissionError = PermissionError # NOQA 

142 ProcessLookupError = ProcessLookupError # NOQA 

143 InterruptedError = InterruptedError # NOQA 

144 ChildProcessError = ChildProcessError # NOQA 

145 FileExistsError = FileExistsError # NOQA 

146else: 

147 # https://github.com/PythonCharmers/python-future/blob/exceptions/ 

148 # src/future/types/exceptions/pep3151.py 

149 import platform 

150 

151 def _instance_checking_exception(base_exception=Exception): 

152 def wrapped(instance_checker): 

153 class TemporaryClass(base_exception): 

154 def __init__(self, *args, **kwargs): 

155 if len(args) == 1 and isinstance(args[0], TemporaryClass): 

156 unwrap_me = args[0] 

157 for attr in dir(unwrap_me): 

158 if not attr.startswith('__'): 

159 setattr(self, attr, getattr(unwrap_me, attr)) 

160 else: 

161 super(TemporaryClass, self).__init__( # noqa 

162 *args, **kwargs 

163 ) 

164 

165 class __metaclass__(type): 

166 def __instancecheck__(cls, inst): 

167 return instance_checker(inst) 

168 

169 def __subclasscheck__(cls, classinfo): 

170 value = sys.exc_info()[1] 

171 return isinstance(value, cls) 

172 

173 TemporaryClass.__name__ = instance_checker.__name__ 

174 TemporaryClass.__doc__ = instance_checker.__doc__ 

175 return TemporaryClass 

176 

177 return wrapped 

178 

179 @_instance_checking_exception(EnvironmentError) 

180 def FileNotFoundError(inst): 

181 return getattr(inst, 'errno', _SENTINEL) == errno.ENOENT 

182 

183 @_instance_checking_exception(EnvironmentError) 

184 def ProcessLookupError(inst): 

185 return getattr(inst, 'errno', _SENTINEL) == errno.ESRCH 

186 

187 @_instance_checking_exception(EnvironmentError) 

188 def PermissionError(inst): 

189 return getattr(inst, 'errno', _SENTINEL) in (errno.EACCES, errno.EPERM) 

190 

191 @_instance_checking_exception(EnvironmentError) 

192 def InterruptedError(inst): 

193 return getattr(inst, 'errno', _SENTINEL) == errno.EINTR 

194 

195 @_instance_checking_exception(EnvironmentError) 

196 def ChildProcessError(inst): 

197 return getattr(inst, 'errno', _SENTINEL) == errno.ECHILD 

198 

199 @_instance_checking_exception(EnvironmentError) 

200 def FileExistsError(inst): 

201 return getattr(inst, 'errno', _SENTINEL) == errno.EEXIST 

202 

203 if platform.python_implementation() != "CPython": 

204 try: 

205 raise OSError(errno.EEXIST, "perm") 

206 except FileExistsError: 

207 pass 

208 except OSError: 

209 msg = ( 

210 "broken or incompatible Python implementation, see: " 

211 "https://github.com/giampaolo/psutil/issues/1659" 

212 ) 

213 raise RuntimeError(msg) 

214 

215 

216# --- stdlib additions 

217 

218 

219# py 3.2 functools.lru_cache 

220# Taken from: http://code.activestate.com/recipes/578078 

221# Credit: Raymond Hettinger 

222try: 

223 from functools import lru_cache 

224except ImportError: 

225 try: 

226 from threading import RLock 

227 except ImportError: 

228 from dummy_threading import RLock 

229 

230 _CacheInfo = collections.namedtuple( 

231 "CacheInfo", ["hits", "misses", "maxsize", "currsize"] 

232 ) 

233 

234 class _HashedSeq(list): 

235 __slots__ = ('hashvalue',) 

236 

237 def __init__(self, tup, hash=hash): 

238 self[:] = tup 

239 self.hashvalue = hash(tup) 

240 

241 def __hash__(self): 

242 return self.hashvalue 

243 

244 def _make_key( 

245 args, 

246 kwds, 

247 typed, 

248 kwd_mark=(_SENTINEL,), 

249 fasttypes=set((int, str, frozenset, type(None))), # noqa 

250 sorted=sorted, 

251 tuple=tuple, 

252 type=type, 

253 len=len, 

254 ): 

255 key = args 

256 if kwds: 

257 sorted_items = sorted(kwds.items()) 

258 key += kwd_mark 

259 for item in sorted_items: 

260 key += item 

261 if typed: 

262 key += tuple(type(v) for v in args) 

263 if kwds: 

264 key += tuple(type(v) for k, v in sorted_items) 

265 elif len(key) == 1 and type(key[0]) in fasttypes: 

266 return key[0] 

267 return _HashedSeq(key) 

268 

269 def lru_cache(maxsize=100, typed=False): 

270 """Least-recently-used cache decorator, see: 

271 http://docs.python.org/3/library/functools.html#functools.lru_cache. 

272 """ 

273 

274 def decorating_function(user_function): 

275 cache = {} 

276 stats = [0, 0] 

277 HITS, MISSES = 0, 1 

278 make_key = _make_key 

279 cache_get = cache.get 

280 _len = len 

281 lock = RLock() 

282 root = [] 

283 root[:] = [root, root, None, None] 

284 nonlocal_root = [root] 

285 PREV, NEXT, KEY, RESULT = 0, 1, 2, 3 

286 if maxsize == 0: 

287 

288 def wrapper(*args, **kwds): 

289 result = user_function(*args, **kwds) 

290 stats[MISSES] += 1 

291 return result 

292 

293 elif maxsize is None: 

294 

295 def wrapper(*args, **kwds): 

296 key = make_key(args, kwds, typed) 

297 result = cache_get(key, root) 

298 if result is not root: 

299 stats[HITS] += 1 

300 return result 

301 result = user_function(*args, **kwds) 

302 cache[key] = result 

303 stats[MISSES] += 1 

304 return result 

305 

306 else: 

307 

308 def wrapper(*args, **kwds): 

309 if kwds or typed: 

310 key = make_key(args, kwds, typed) 

311 else: 

312 key = args 

313 lock.acquire() 

314 try: 

315 link = cache_get(key) 

316 if link is not None: 

317 (root,) = nonlocal_root 

318 link_prev, link_next, key, result = link 

319 link_prev[NEXT] = link_next 

320 link_next[PREV] = link_prev 

321 last = root[PREV] 

322 last[NEXT] = root[PREV] = link 

323 link[PREV] = last 

324 link[NEXT] = root 

325 stats[HITS] += 1 

326 return result 

327 finally: 

328 lock.release() 

329 result = user_function(*args, **kwds) 

330 lock.acquire() 

331 try: 

332 (root,) = nonlocal_root 

333 if key in cache: 

334 pass 

335 elif _len(cache) >= maxsize: 

336 oldroot = root 

337 oldroot[KEY] = key 

338 oldroot[RESULT] = result 

339 root = nonlocal_root[0] = oldroot[NEXT] 

340 oldkey = root[KEY] 

341 root[KEY] = root[RESULT] = None 

342 del cache[oldkey] 

343 cache[key] = oldroot 

344 else: 

345 last = root[PREV] 

346 link = [last, root, key, result] 

347 last[NEXT] = root[PREV] = cache[key] = link 

348 stats[MISSES] += 1 

349 finally: 

350 lock.release() 

351 return result 

352 

353 def cache_info(): 

354 """Report cache statistics.""" 

355 lock.acquire() 

356 try: 

357 return _CacheInfo( 

358 stats[HITS], stats[MISSES], maxsize, len(cache) 

359 ) 

360 finally: 

361 lock.release() 

362 

363 def cache_clear(): 

364 """Clear the cache and cache statistics.""" 

365 lock.acquire() 

366 try: 

367 cache.clear() 

368 root = nonlocal_root[0] 

369 root[:] = [root, root, None, None] 

370 stats[:] = [0, 0] 

371 finally: 

372 lock.release() 

373 

374 wrapper.__wrapped__ = user_function 

375 wrapper.cache_info = cache_info 

376 wrapper.cache_clear = cache_clear 

377 return functools.update_wrapper(wrapper, user_function) 

378 

379 return decorating_function 

380 

381 

382# python 3.3 

383try: 

384 from shutil import which 

385except ImportError: 

386 

387 def which(cmd, mode=os.F_OK | os.X_OK, path=None): 

388 """Given a command, mode, and a PATH string, return the path which 

389 conforms to the given mode on the PATH, or None if there is no such 

390 file. 

391 

392 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result 

393 of os.environ.get("PATH"), or can be overridden with a custom search 

394 path. 

395 """ 

396 

397 def _access_check(fn, mode): 

398 return ( 

399 os.path.exists(fn) 

400 and os.access(fn, mode) 

401 and not os.path.isdir(fn) 

402 ) 

403 

404 if os.path.dirname(cmd): 

405 if _access_check(cmd, mode): 

406 return cmd 

407 return None 

408 

409 if path is None: 

410 path = os.environ.get("PATH", os.defpath) 

411 if not path: 

412 return None 

413 path = path.split(os.pathsep) 

414 

415 if sys.platform == "win32": 

416 if os.curdir not in path: 

417 path.insert(0, os.curdir) 

418 

419 pathext = os.environ.get("PATHEXT", "").split(os.pathsep) 

420 if any(cmd.lower().endswith(ext.lower()) for ext in pathext): 

421 files = [cmd] 

422 else: 

423 files = [cmd + ext for ext in pathext] 

424 else: 

425 files = [cmd] 

426 

427 seen = set() 

428 for dir in path: 

429 normdir = os.path.normcase(dir) 

430 if normdir not in seen: 

431 seen.add(normdir) 

432 for thefile in files: 

433 name = os.path.join(dir, thefile) 

434 if _access_check(name, mode): 

435 return name 

436 return None 

437 

438 

439# python 3.3 

440try: 

441 from shutil import get_terminal_size 

442except ImportError: 

443 

444 def get_terminal_size(fallback=(80, 24)): 

445 try: 

446 import fcntl 

447 import struct 

448 import termios 

449 except ImportError: 

450 return fallback 

451 else: 

452 try: 

453 # This should work on Linux. 

454 res = struct.unpack( 

455 'hh', fcntl.ioctl(1, termios.TIOCGWINSZ, '1234') 

456 ) 

457 return (res[1], res[0]) 

458 except Exception: # noqa: BLE001 

459 return fallback 

460 

461 

462# python 3.3 

463try: 

464 from subprocess import TimeoutExpired as SubprocessTimeoutExpired 

465except ImportError: 

466 

467 class SubprocessTimeoutExpired(Exception): 

468 pass 

469 

470 

471# python 3.5 

472try: 

473 from contextlib import redirect_stderr 

474except ImportError: 

475 

476 @contextlib.contextmanager 

477 def redirect_stderr(new_target): 

478 original = sys.stderr 

479 try: 

480 sys.stderr = new_target 

481 yield new_target 

482 finally: 

483 sys.stderr = original