Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/astroid/brain/brain_multiprocessing.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

28 statements  

1# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html 

2# For details: https://github.com/pylint-dev/astroid/blob/main/LICENSE 

3# Copyright (c) https://github.com/pylint-dev/astroid/blob/main/CONTRIBUTORS.txt 

4 

5from astroid.bases import BoundMethod 

6from astroid.brain.helpers import register_module_extender 

7from astroid.builder import parse 

8from astroid.exceptions import InferenceError 

9from astroid.manager import AstroidManager 

10from astroid.nodes.scoped_nodes import FunctionDef 

11 

12 

13def _multiprocessing_transform(): 

14 module = parse( 

15 """ 

16 from multiprocessing.managers import SyncManager 

17 def Manager(): 

18 return SyncManager() 

19 """ 

20 ) 

21 # Multiprocessing uses a getattr lookup inside contexts, 

22 # in order to get the attributes they need. Since it's extremely 

23 # dynamic, we use this approach to fake it. 

24 node = parse( 

25 """ 

26 from multiprocessing.context import DefaultContext, BaseContext 

27 default = DefaultContext() 

28 base = BaseContext() 

29 """ 

30 ) 

31 try: 

32 context = next(node["default"].infer()) 

33 base = next(node["base"].infer()) 

34 except (InferenceError, StopIteration): 

35 return module 

36 

37 for node in (context, base): 

38 for key, value in node.locals.items(): 

39 if key.startswith("_"): 

40 continue 

41 

42 value = value[0] 

43 if isinstance(value, FunctionDef): 

44 # We need to rebound this, since otherwise 

45 # it will have an extra argument (self). 

46 value = BoundMethod(value, node) 

47 module[key] = value 

48 return module 

49 

50 

51def _multiprocessing_managers_transform(): 

52 return parse( 

53 """ 

54 import array 

55 import threading 

56 import multiprocessing.pool as pool 

57 import queue 

58 

59 class Namespace(object): 

60 pass 

61 

62 class Value(object): 

63 def __init__(self, typecode, value, lock=True): 

64 self._typecode = typecode 

65 self._value = value 

66 def get(self): 

67 return self._value 

68 def set(self, value): 

69 self._value = value 

70 def __repr__(self): 

71 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) 

72 value = property(get, set) 

73 

74 def Array(typecode, sequence, lock=True): 

75 return array.array(typecode, sequence) 

76 

77 class SyncManager(object): 

78 Queue = JoinableQueue = queue.Queue 

79 Event = threading.Event 

80 RLock = threading.RLock 

81 Lock = threading.Lock 

82 BoundedSemaphore = threading.BoundedSemaphore 

83 Condition = threading.Condition 

84 Barrier = threading.Barrier 

85 Pool = pool.Pool 

86 list = list 

87 dict = dict 

88 Value = Value 

89 Array = Array 

90 Namespace = Namespace 

91 __enter__ = lambda self: self 

92 __exit__ = lambda *args: args 

93 

94 def start(self, initializer=None, initargs=None): 

95 pass 

96 def shutdown(self): 

97 pass 

98 """ 

99 ) 

100 

101 

102def register(manager: AstroidManager) -> None: 

103 register_module_extender( 

104 manager, "multiprocessing.managers", _multiprocessing_managers_transform 

105 ) 

106 register_module_extender(manager, "multiprocessing", _multiprocessing_transform)