Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/astroid/brain/brain_multiprocessing.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

29 statements  

1# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html 

2# For details: https://github.com/pylint-dev/astroid/blob/main/LICENSE 

3# Copyright (c) https://github.com/pylint-dev/astroid/blob/main/CONTRIBUTORS.txt 

4 

5from astroid.bases import BoundMethod 

6from astroid.brain.helpers import register_module_extender 

7from astroid.builder import parse 

8from astroid.exceptions import InferenceError 

9from astroid.manager import AstroidManager 

10from astroid.nodes.scoped_nodes import FunctionDef 

11 

12 

13def _multiprocessing_transform(): 

14 module = parse(""" 

15 from multiprocessing.managers import SyncManager 

16 def Manager(): 

17 return SyncManager() 

18 """) 

19 # Multiprocessing uses a getattr lookup inside contexts, 

20 # in order to get the attributes they need. Since it's extremely 

21 # dynamic, we use this approach to fake it. 

22 node = parse(""" 

23 from multiprocessing.context import DefaultContext, BaseContext 

24 default = DefaultContext() 

25 base = BaseContext() 

26 """) 

27 try: 

28 context = next(node["default"].infer()) 

29 base = next(node["base"].infer()) 

30 except (InferenceError, StopIteration): 

31 return module 

32 

33 for node in (context, base): 

34 for key, value in node.locals.items(): 

35 if key.startswith("_"): 

36 continue 

37 

38 value = value[0] 

39 if isinstance(value, FunctionDef): 

40 # We need to rebound this, since otherwise 

41 # it will have an extra argument (self). 

42 value = BoundMethod(value, node) 

43 module[key] = value 

44 return module 

45 

46 

47def _multiprocessing_managers_transform(): 

48 return parse(""" 

49 import array 

50 import threading 

51 import multiprocessing.pool as pool 

52 import queue 

53 

54 class Namespace(object): 

55 pass 

56 

57 class Value(object): 

58 def __init__(self, typecode, value, lock=True): 

59 self._typecode = typecode 

60 self._value = value 

61 def get(self): 

62 return self._value 

63 def set(self, value): 

64 self._value = value 

65 def __repr__(self): 

66 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) 

67 value = property(get, set) 

68 

69 def Array(typecode, sequence, lock=True): 

70 return array.array(typecode, sequence) 

71 

72 class SyncManager(object): 

73 Queue = JoinableQueue = queue.Queue 

74 Event = threading.Event 

75 RLock = threading.RLock 

76 Lock = threading.Lock 

77 BoundedSemaphore = threading.BoundedSemaphore 

78 Condition = threading.Condition 

79 Barrier = threading.Barrier 

80 Pool = pool.Pool 

81 list = list 

82 dict = dict 

83 Value = Value 

84 Array = Array 

85 Namespace = Namespace 

86 __enter__ = lambda self: self 

87 __exit__ = lambda *args: args 

88 

89 def start(self, initializer=None, initargs=None): 

90 pass 

91 def shutdown(self): 

92 pass 

93 """) 

94 

95 

96def register(manager: AstroidManager) -> None: 

97 register_module_extender( 

98 manager, "multiprocessing.managers", _multiprocessing_managers_transform 

99 ) 

100 register_module_extender(manager, "multiprocessing", _multiprocessing_transform)