1# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html
2# For details: https://github.com/pylint-dev/astroid/blob/main/LICENSE
3# Copyright (c) https://github.com/pylint-dev/astroid/blob/main/CONTRIBUTORS.txt
4
5from astroid.bases import BoundMethod
6from astroid.brain.helpers import register_module_extender
7from astroid.builder import parse
8from astroid.exceptions import InferenceError
9from astroid.manager import AstroidManager
10from astroid.nodes.scoped_nodes import FunctionDef
11
12
13def _multiprocessing_transform():
14 module = parse("""
15 from multiprocessing.managers import SyncManager
16 def Manager():
17 return SyncManager()
18 """)
19 # Multiprocessing uses a getattr lookup inside contexts,
20 # in order to get the attributes they need. Since it's extremely
21 # dynamic, we use this approach to fake it.
22 node = parse("""
23 from multiprocessing.context import DefaultContext, BaseContext
24 default = DefaultContext()
25 base = BaseContext()
26 """)
27 try:
28 context = next(node["default"].infer())
29 base = next(node["base"].infer())
30 except (InferenceError, StopIteration):
31 return module
32
33 for node in (context, base):
34 for key, value in node.locals.items():
35 if key.startswith("_"):
36 continue
37
38 value = value[0]
39 if isinstance(value, FunctionDef):
40 # We need to rebound this, since otherwise
41 # it will have an extra argument (self).
42 value = BoundMethod(value, node)
43 module[key] = value
44 return module
45
46
47def _multiprocessing_managers_transform():
48 return parse("""
49 import array
50 import threading
51 import multiprocessing.pool as pool
52 import queue
53
54 class Namespace(object):
55 pass
56
57 class Value(object):
58 def __init__(self, typecode, value, lock=True):
59 self._typecode = typecode
60 self._value = value
61 def get(self):
62 return self._value
63 def set(self, value):
64 self._value = value
65 def __repr__(self):
66 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
67 value = property(get, set)
68
69 def Array(typecode, sequence, lock=True):
70 return array.array(typecode, sequence)
71
72 class SyncManager(object):
73 Queue = JoinableQueue = queue.Queue
74 Event = threading.Event
75 RLock = threading.RLock
76 Lock = threading.Lock
77 BoundedSemaphore = threading.BoundedSemaphore
78 Condition = threading.Condition
79 Barrier = threading.Barrier
80 Pool = pool.Pool
81 list = list
82 dict = dict
83 Value = Value
84 Array = Array
85 Namespace = Namespace
86 __enter__ = lambda self: self
87 __exit__ = lambda *args: args
88
89 def start(self, initializer=None, initargs=None):
90 pass
91 def shutdown(self):
92 pass
93 """)
94
95
96def register(manager: AstroidManager) -> None:
97 register_module_extender(
98 manager, "multiprocessing.managers", _multiprocessing_managers_transform
99 )
100 register_module_extender(manager, "multiprocessing", _multiprocessing_transform)