Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/multiprocessing/context.py: 1%

210 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1import os 

2import sys 

3import threading 

4 

5from . import process 

6from . import reduction 

7 

8__all__ = () 

9 

10# 

11# Exceptions 

12# 

13 

14class ProcessError(Exception): 

15 pass 

16 

17class BufferTooShort(ProcessError): 

18 pass 

19 

20class TimeoutError(ProcessError): 

21 pass 

22 

23class AuthenticationError(ProcessError): 

24 pass 

25 

26# 

27# Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py 

28# 

29 

30class BaseContext(object): 

31 

32 ProcessError = ProcessError 

33 BufferTooShort = BufferTooShort 

34 TimeoutError = TimeoutError 

35 AuthenticationError = AuthenticationError 

36 

37 current_process = staticmethod(process.current_process) 

38 parent_process = staticmethod(process.parent_process) 

39 active_children = staticmethod(process.active_children) 

40 

41 def cpu_count(self): 

42 '''Returns the number of CPUs in the system''' 

43 num = os.cpu_count() 

44 if num is None: 

45 raise NotImplementedError('cannot determine number of cpus') 

46 else: 

47 return num 

48 

49 def Manager(self): 

50 '''Returns a manager associated with a running server process 

51 

52 The managers methods such as `Lock()`, `Condition()` and `Queue()` 

53 can be used to create shared objects. 

54 ''' 

55 from .managers import SyncManager 

56 m = SyncManager(ctx=self.get_context()) 

57 m.start() 

58 return m 

59 

60 def Pipe(self, duplex=True): 

61 '''Returns two connection object connected by a pipe''' 

62 from .connection import Pipe 

63 return Pipe(duplex) 

64 

65 def Lock(self): 

66 '''Returns a non-recursive lock object''' 

67 from .synchronize import Lock 

68 return Lock(ctx=self.get_context()) 

69 

70 def RLock(self): 

71 '''Returns a recursive lock object''' 

72 from .synchronize import RLock 

73 return RLock(ctx=self.get_context()) 

74 

75 def Condition(self, lock=None): 

76 '''Returns a condition object''' 

77 from .synchronize import Condition 

78 return Condition(lock, ctx=self.get_context()) 

79 

80 def Semaphore(self, value=1): 

81 '''Returns a semaphore object''' 

82 from .synchronize import Semaphore 

83 return Semaphore(value, ctx=self.get_context()) 

84 

85 def BoundedSemaphore(self, value=1): 

86 '''Returns a bounded semaphore object''' 

87 from .synchronize import BoundedSemaphore 

88 return BoundedSemaphore(value, ctx=self.get_context()) 

89 

90 def Event(self): 

91 '''Returns an event object''' 

92 from .synchronize import Event 

93 return Event(ctx=self.get_context()) 

94 

95 def Barrier(self, parties, action=None, timeout=None): 

96 '''Returns a barrier object''' 

97 from .synchronize import Barrier 

98 return Barrier(parties, action, timeout, ctx=self.get_context()) 

99 

100 def Queue(self, maxsize=0): 

101 '''Returns a queue object''' 

102 from .queues import Queue 

103 return Queue(maxsize, ctx=self.get_context()) 

104 

105 def JoinableQueue(self, maxsize=0): 

106 '''Returns a queue object''' 

107 from .queues import JoinableQueue 

108 return JoinableQueue(maxsize, ctx=self.get_context()) 

109 

110 def SimpleQueue(self): 

111 '''Returns a queue object''' 

112 from .queues import SimpleQueue 

113 return SimpleQueue(ctx=self.get_context()) 

114 

115 def Pool(self, processes=None, initializer=None, initargs=(), 

116 maxtasksperchild=None): 

117 '''Returns a process pool object''' 

118 from .pool import Pool 

119 return Pool(processes, initializer, initargs, maxtasksperchild, 

120 context=self.get_context()) 

121 

122 def RawValue(self, typecode_or_type, *args): 

123 '''Returns a shared object''' 

124 from .sharedctypes import RawValue 

125 return RawValue(typecode_or_type, *args) 

126 

127 def RawArray(self, typecode_or_type, size_or_initializer): 

128 '''Returns a shared array''' 

129 from .sharedctypes import RawArray 

130 return RawArray(typecode_or_type, size_or_initializer) 

131 

132 def Value(self, typecode_or_type, *args, lock=True): 

133 '''Returns a synchronized shared object''' 

134 from .sharedctypes import Value 

135 return Value(typecode_or_type, *args, lock=lock, 

136 ctx=self.get_context()) 

137 

138 def Array(self, typecode_or_type, size_or_initializer, *, lock=True): 

139 '''Returns a synchronized shared array''' 

140 from .sharedctypes import Array 

141 return Array(typecode_or_type, size_or_initializer, lock=lock, 

142 ctx=self.get_context()) 

143 

144 def freeze_support(self): 

145 '''Check whether this is a fake forked process in a frozen executable. 

146 If so then run code specified by commandline and exit. 

147 ''' 

148 if sys.platform == 'win32' and getattr(sys, 'frozen', False): 

149 from .spawn import freeze_support 

150 freeze_support() 

151 

152 def get_logger(self): 

153 '''Return package logger -- if it does not already exist then 

154 it is created. 

155 ''' 

156 from .util import get_logger 

157 return get_logger() 

158 

159 def log_to_stderr(self, level=None): 

160 '''Turn on logging and add a handler which prints to stderr''' 

161 from .util import log_to_stderr 

162 return log_to_stderr(level) 

163 

164 def allow_connection_pickling(self): 

165 '''Install support for sending connections and sockets 

166 between processes 

167 ''' 

168 # This is undocumented. In previous versions of multiprocessing 

169 # its only effect was to make socket objects inheritable on Windows. 

170 from . import connection 

171 

172 def set_executable(self, executable): 

173 '''Sets the path to a python.exe or pythonw.exe binary used to run 

174 child processes instead of sys.executable when using the 'spawn' 

175 start method. Useful for people embedding Python. 

176 ''' 

177 from .spawn import set_executable 

178 set_executable(executable) 

179 

180 def set_forkserver_preload(self, module_names): 

181 '''Set list of module names to try to load in forkserver process. 

182 This is really just a hint. 

183 ''' 

184 from .forkserver import set_forkserver_preload 

185 set_forkserver_preload(module_names) 

186 

187 def get_context(self, method=None): 

188 if method is None: 

189 return self 

190 try: 

191 ctx = _concrete_contexts[method] 

192 except KeyError: 

193 raise ValueError('cannot find context for %r' % method) from None 

194 ctx._check_available() 

195 return ctx 

196 

197 def get_start_method(self, allow_none=False): 

198 return self._name 

199 

200 def set_start_method(self, method, force=False): 

201 raise ValueError('cannot set start method of concrete context') 

202 

203 @property 

204 def reducer(self): 

205 '''Controls how objects will be reduced to a form that can be 

206 shared with other processes.''' 

207 return globals().get('reduction') 

208 

209 @reducer.setter 

210 def reducer(self, reduction): 

211 globals()['reduction'] = reduction 

212 

213 def _check_available(self): 

214 pass 

215 

216# 

217# Type of default context -- underlying context can be set at most once 

218# 

219 

220class Process(process.BaseProcess): 

221 _start_method = None 

222 @staticmethod 

223 def _Popen(process_obj): 

224 return _default_context.get_context().Process._Popen(process_obj) 

225 

226class DefaultContext(BaseContext): 

227 Process = Process 

228 

229 def __init__(self, context): 

230 self._default_context = context 

231 self._actual_context = None 

232 

233 def get_context(self, method=None): 

234 if method is None: 

235 if self._actual_context is None: 

236 self._actual_context = self._default_context 

237 return self._actual_context 

238 else: 

239 return super().get_context(method) 

240 

241 def set_start_method(self, method, force=False): 

242 if self._actual_context is not None and not force: 

243 raise RuntimeError('context has already been set') 

244 if method is None and force: 

245 self._actual_context = None 

246 return 

247 self._actual_context = self.get_context(method) 

248 

249 def get_start_method(self, allow_none=False): 

250 if self._actual_context is None: 

251 if allow_none: 

252 return None 

253 self._actual_context = self._default_context 

254 return self._actual_context._name 

255 

256 def get_all_start_methods(self): 

257 if sys.platform == 'win32': 

258 return ['spawn'] 

259 else: 

260 if reduction.HAVE_SEND_HANDLE: 

261 return ['fork', 'spawn', 'forkserver'] 

262 else: 

263 return ['fork', 'spawn'] 

264 

265# 

266# Context types for fixed start method 

267# 

268 

269if sys.platform != 'win32': 

270 

271 class ForkProcess(process.BaseProcess): 

272 _start_method = 'fork' 

273 @staticmethod 

274 def _Popen(process_obj): 

275 from .popen_fork import Popen 

276 return Popen(process_obj) 

277 

278 class SpawnProcess(process.BaseProcess): 

279 _start_method = 'spawn' 

280 @staticmethod 

281 def _Popen(process_obj): 

282 from .popen_spawn_posix import Popen 

283 return Popen(process_obj) 

284 

285 class ForkServerProcess(process.BaseProcess): 

286 _start_method = 'forkserver' 

287 @staticmethod 

288 def _Popen(process_obj): 

289 from .popen_forkserver import Popen 

290 return Popen(process_obj) 

291 

292 class ForkContext(BaseContext): 

293 _name = 'fork' 

294 Process = ForkProcess 

295 

296 class SpawnContext(BaseContext): 

297 _name = 'spawn' 

298 Process = SpawnProcess 

299 

300 class ForkServerContext(BaseContext): 

301 _name = 'forkserver' 

302 Process = ForkServerProcess 

303 def _check_available(self): 

304 if not reduction.HAVE_SEND_HANDLE: 

305 raise ValueError('forkserver start method not available') 

306 

307 _concrete_contexts = { 

308 'fork': ForkContext(), 

309 'spawn': SpawnContext(), 

310 'forkserver': ForkServerContext(), 

311 } 

312 if sys.platform == 'darwin': 

313 # bpo-33725: running arbitrary code after fork() is no longer reliable 

314 # on macOS since macOS 10.14 (Mojave). Use spawn by default instead. 

315 _default_context = DefaultContext(_concrete_contexts['spawn']) 

316 else: 

317 _default_context = DefaultContext(_concrete_contexts['fork']) 

318 

319else: 

320 

321 class SpawnProcess(process.BaseProcess): 

322 _start_method = 'spawn' 

323 @staticmethod 

324 def _Popen(process_obj): 

325 from .popen_spawn_win32 import Popen 

326 return Popen(process_obj) 

327 

328 class SpawnContext(BaseContext): 

329 _name = 'spawn' 

330 Process = SpawnProcess 

331 

332 _concrete_contexts = { 

333 'spawn': SpawnContext(), 

334 } 

335 _default_context = DefaultContext(_concrete_contexts['spawn']) 

336 

337# 

338# Force the start method 

339# 

340 

341def _force_start_method(method): 

342 _default_context._actual_context = _concrete_contexts[method] 

343 

344# 

345# Check that the current thread is spawning a child process 

346# 

347 

348_tls = threading.local() 

349 

350def get_spawning_popen(): 

351 return getattr(_tls, 'spawning_popen', None) 

352 

353def set_spawning_popen(popen): 

354 _tls.spawning_popen = popen 

355 

356def assert_spawning(obj): 

357 if get_spawning_popen() is None: 

358 raise RuntimeError( 

359 '%s objects should only be shared between processes' 

360 ' through inheritance' % type(obj).__name__ 

361 )