Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/pool.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

120 statements  

1"""Custom implementation of multiprocessing.Pool with custom pickler. 

2 

3This module provides efficient ways of working with data stored in 

4shared memory with numpy.memmap arrays without inducing any memory 

5copy between the parent and child processes. 

6 

7This module should not be imported if multiprocessing is not 

8available as it implements subclasses of multiprocessing Pool 

9that uses a custom alternative to SimpleQueue. 

10 

11""" 

12# Author: Olivier Grisel <olivier.grisel@ensta.org> 

13# Copyright: 2012, Olivier Grisel 

14# License: BSD 3 clause 

15 

16import copyreg 

17import sys 

18import warnings 

19from time import sleep 

20 

21try: 

22 WindowsError 

23except NameError: 

24 WindowsError = type(None) 

25 

26from io import BytesIO 

27 

28# We need the class definition to derive from it, not the multiprocessing.Pool 

29# factory function 

30from multiprocessing.pool import Pool 

31from pickle import HIGHEST_PROTOCOL, Pickler 

32 

33from ._memmapping_reducer import TemporaryResourcesManager, get_memmapping_reducers 

34from ._multiprocessing_helpers import assert_spawning, mp 

35 

36try: 

37 import numpy as np 

38except ImportError: 

39 np = None 

40 

41 

42############################################################################### 

43# Enable custom pickling in Pool queues 

44 

45 

46class CustomizablePickler(Pickler): 

47 """Pickler that accepts custom reducers. 

48 

49 TODO python2_drop : can this be simplified ? 

50 

51 HIGHEST_PROTOCOL is selected by default as this pickler is used 

52 to pickle ephemeral datastructures for interprocess communication 

53 hence no backward compatibility is required. 

54 

55 `reducers` is expected to be a dictionary with key/values 

56 being `(type, callable)` pairs where `callable` is a function that 

57 give an instance of `type` will return a tuple `(constructor, 

58 tuple_of_objects)` to rebuild an instance out of the pickled 

59 `tuple_of_objects` as would return a `__reduce__` method. See the 

60 standard library documentation on pickling for more details. 

61 

62 """ 

63 

64 # We override the pure Python pickler as its the only way to be able to 

65 # customize the dispatch table without side effects in Python 2.7 

66 # to 3.2. For Python 3.3+ leverage the new dispatch_table 

67 # feature from https://bugs.python.org/issue14166 that makes it possible 

68 # to use the C implementation of the Pickler which is faster. 

69 

70 def __init__(self, writer, reducers=None, protocol=HIGHEST_PROTOCOL): 

71 Pickler.__init__(self, writer, protocol=protocol) 

72 if reducers is None: 

73 reducers = {} 

74 if hasattr(Pickler, "dispatch"): 

75 # Make the dispatch registry an instance level attribute instead of 

76 # a reference to the class dictionary under Python 2 

77 self.dispatch = Pickler.dispatch.copy() 

78 else: 

79 # Under Python 3 initialize the dispatch table with a copy of the 

80 # default registry 

81 self.dispatch_table = copyreg.dispatch_table.copy() 

82 for type, reduce_func in reducers.items(): 

83 self.register(type, reduce_func) 

84 

85 def register(self, type, reduce_func): 

86 """Attach a reducer function to a given type in the dispatch table.""" 

87 if hasattr(Pickler, "dispatch"): 

88 # Python 2 pickler dispatching is not explicitly customizable. 

89 # Let us use a closure to workaround this limitation. 

90 def dispatcher(self, obj): 

91 reduced = reduce_func(obj) 

92 self.save_reduce(obj=obj, *reduced) 

93 

94 self.dispatch[type] = dispatcher 

95 else: 

96 self.dispatch_table[type] = reduce_func 

97 

98 

99class CustomizablePicklingQueue(object): 

100 """Locked Pipe implementation that uses a customizable pickler. 

101 

102 This class is an alternative to the multiprocessing implementation 

103 of SimpleQueue in order to make it possible to pass custom 

104 pickling reducers, for instance to avoid memory copy when passing 

105 memory mapped datastructures. 

106 

107 `reducers` is expected to be a dict with key / values being 

108 `(type, callable)` pairs where `callable` is a function that, given an 

109 instance of `type`, will return a tuple `(constructor, tuple_of_objects)` 

110 to rebuild an instance out of the pickled `tuple_of_objects` as would 

111 return a `__reduce__` method. 

112 

113 See the standard library documentation on pickling for more details. 

114 """ 

115 

116 def __init__(self, context, reducers=None): 

117 self._reducers = reducers 

118 self._reader, self._writer = context.Pipe(duplex=False) 

119 self._rlock = context.Lock() 

120 if sys.platform == "win32": 

121 self._wlock = None 

122 else: 

123 self._wlock = context.Lock() 

124 self._make_methods() 

125 

126 def __getstate__(self): 

127 assert_spawning(self) 

128 return (self._reader, self._writer, self._rlock, self._wlock, self._reducers) 

129 

130 def __setstate__(self, state): 

131 (self._reader, self._writer, self._rlock, self._wlock, self._reducers) = state 

132 self._make_methods() 

133 

134 def empty(self): 

135 return not self._reader.poll() 

136 

137 def _make_methods(self): 

138 self._recv = recv = self._reader.recv 

139 racquire, rrelease = self._rlock.acquire, self._rlock.release 

140 

141 def get(): 

142 racquire() 

143 try: 

144 return recv() 

145 finally: 

146 rrelease() 

147 

148 self.get = get 

149 

150 if self._reducers: 

151 

152 def send(obj): 

153 buffer = BytesIO() 

154 CustomizablePickler(buffer, self._reducers).dump(obj) 

155 self._writer.send_bytes(buffer.getvalue()) 

156 

157 self._send = send 

158 else: 

159 self._send = send = self._writer.send 

160 if self._wlock is None: 

161 # writes to a message oriented win32 pipe are atomic 

162 self.put = send 

163 else: 

164 wlock_acquire, wlock_release = (self._wlock.acquire, self._wlock.release) 

165 

166 def put(obj): 

167 wlock_acquire() 

168 try: 

169 return send(obj) 

170 finally: 

171 wlock_release() 

172 

173 self.put = put 

174 

175 

176class PicklingPool(Pool): 

177 """Pool implementation with customizable pickling reducers. 

178 

179 This is useful to control how data is shipped between processes 

180 and makes it possible to use shared memory without useless 

181 copies induces by the default pickling methods of the original 

182 objects passed as arguments to dispatch. 

183 

184 `forward_reducers` and `backward_reducers` are expected to be 

185 dictionaries with key/values being `(type, callable)` pairs where 

186 `callable` is a function that, given an instance of `type`, will return a 

187 tuple `(constructor, tuple_of_objects)` to rebuild an instance out of the 

188 pickled `tuple_of_objects` as would return a `__reduce__` method. 

189 See the standard library documentation about pickling for more details. 

190 

191 """ 

192 

193 def __init__( 

194 self, processes=None, forward_reducers=None, backward_reducers=None, **kwargs 

195 ): 

196 if forward_reducers is None: 

197 forward_reducers = dict() 

198 if backward_reducers is None: 

199 backward_reducers = dict() 

200 self._forward_reducers = forward_reducers 

201 self._backward_reducers = backward_reducers 

202 poolargs = dict(processes=processes) 

203 poolargs.update(kwargs) 

204 super(PicklingPool, self).__init__(**poolargs) 

205 

206 def _setup_queues(self): 

207 context = getattr(self, "_ctx", mp) 

208 self._inqueue = CustomizablePicklingQueue(context, self._forward_reducers) 

209 self._outqueue = CustomizablePicklingQueue(context, self._backward_reducers) 

210 self._quick_put = self._inqueue._send 

211 self._quick_get = self._outqueue._recv 

212 

213 

214class MemmappingPool(PicklingPool): 

215 """Process pool that shares large arrays to avoid memory copy. 

216 

217 This drop-in replacement for `multiprocessing.pool.Pool` makes 

218 it possible to work efficiently with shared memory in a numpy 

219 context. 

220 

221 Existing instances of numpy.memmap are preserved: the child 

222 suprocesses will have access to the same shared memory in the 

223 original mode except for the 'w+' mode that is automatically 

224 transformed as 'r+' to avoid zeroing the original data upon 

225 instantiation. 

226 

227 Furthermore large arrays from the parent process are automatically 

228 dumped to a temporary folder on the filesystem such as child 

229 processes to access their content via memmapping (file system 

230 backed shared memory). 

231 

232 Note: it is important to call the terminate method to collect 

233 the temporary folder used by the pool. 

234 

235 Parameters 

236 ---------- 

237 processes: int, optional 

238 Number of worker processes running concurrently in the pool. 

239 initializer: callable, optional 

240 Callable executed on worker process creation. 

241 initargs: tuple, optional 

242 Arguments passed to the initializer callable. 

243 temp_folder: (str, callable) optional 

244 If str: 

245 Folder to be used by the pool for memmapping large arrays 

246 for sharing memory with worker processes. If None, this will try in 

247 order: 

248 - a folder pointed by the JOBLIB_TEMP_FOLDER environment variable, 

249 - /dev/shm if the folder exists and is writable: this is a RAMdisk 

250 filesystem available by default on modern Linux distributions, 

251 - the default system temporary folder that can be overridden 

252 with TMP, TMPDIR or TEMP environment variables, typically /tmp 

253 under Unix operating systems. 

254 if callable: 

255 An callable in charge of dynamically resolving a temporary folder 

256 for memmapping large arrays. 

257 max_nbytes int or None, optional, 1e6 by default 

258 Threshold on the size of arrays passed to the workers that 

259 triggers automated memory mapping in temp_folder. 

260 Use None to disable memmapping of large arrays. 

261 mmap_mode: {'r+', 'r', 'w+', 'c'} 

262 Memmapping mode for numpy arrays passed to workers. 

263 See 'max_nbytes' parameter documentation for more details. 

264 forward_reducers: dictionary, optional 

265 Reducers used to pickle objects passed from main process to worker 

266 processes: see below. 

267 backward_reducers: dictionary, optional 

268 Reducers used to pickle return values from workers back to the 

269 main process. 

270 verbose: int, optional 

271 Make it possible to monitor how the communication of numpy arrays 

272 with the subprocess is handled (pickling or memmapping) 

273 prewarm: bool or str, optional, "auto" by default. 

274 If True, force a read on newly memmapped array to make sure that OS 

275 pre-cache it in memory. This can be useful to avoid concurrent disk 

276 access when the same data array is passed to different worker 

277 processes. If "auto" (by default), prewarm is set to True, unless the 

278 Linux shared memory partition /dev/shm is available and used as temp 

279 folder. 

280 

281 `forward_reducers` and `backward_reducers` are expected to be 

282 dictionaries with key/values being `(type, callable)` pairs where 

283 `callable` is a function that give an instance of `type` will return 

284 a tuple `(constructor, tuple_of_objects)` to rebuild an instance out 

285 of the pickled `tuple_of_objects` as would return a `__reduce__` 

286 method. See the standard library documentation on pickling for more 

287 details. 

288 

289 """ 

290 

291 def __init__( 

292 self, 

293 processes=None, 

294 temp_folder=None, 

295 max_nbytes=1e6, 

296 mmap_mode="r", 

297 forward_reducers=None, 

298 backward_reducers=None, 

299 verbose=0, 

300 prewarm=False, 

301 **kwargs, 

302 ): 

303 manager = TemporaryResourcesManager(temp_folder) 

304 self._temp_folder_manager = manager 

305 

306 # The usage of a temp_folder_resolver over a simple temp_folder is 

307 # superfluous for multiprocessing pools, as they don't get reused, see 

308 # get_memmapping_executor for more details. We still use it for code 

309 # simplicity. 

310 forward_reducers, backward_reducers = get_memmapping_reducers( 

311 temp_folder_resolver=manager.resolve_temp_folder_name, 

312 max_nbytes=max_nbytes, 

313 mmap_mode=mmap_mode, 

314 forward_reducers=forward_reducers, 

315 backward_reducers=backward_reducers, 

316 verbose=verbose, 

317 unlink_on_gc_collect=False, 

318 prewarm=prewarm, 

319 ) 

320 

321 poolargs = dict( 

322 processes=processes, 

323 forward_reducers=forward_reducers, 

324 backward_reducers=backward_reducers, 

325 ) 

326 poolargs.update(kwargs) 

327 super(MemmappingPool, self).__init__(**poolargs) 

328 

329 def terminate(self): 

330 n_retries = 10 

331 for i in range(n_retries): 

332 try: 

333 super(MemmappingPool, self).terminate() 

334 break 

335 except OSError as e: 

336 if isinstance(e, WindowsError): 

337 # Workaround occasional "[Error 5] Access is denied" issue 

338 # when trying to terminate a process under windows. 

339 sleep(0.1) 

340 if i + 1 == n_retries: 

341 warnings.warn( 

342 "Failed to terminate worker processes in" 

343 " multiprocessing pool: %r" % e 

344 ) 

345 

346 # Clean up the temporary resources as the workers should now be off. 

347 self._temp_folder_manager._clean_temporary_resources() 

348 

349 @property 

350 def _temp_folder(self): 

351 # Legacy property in tests. could be removed if we refactored the 

352 # memmapping tests. SHOULD ONLY BE USED IN TESTS! 

353 # We cache this property because it is called late in the tests - at 

354 # this point, all context have been unregistered, and 

355 # resolve_temp_folder_name raises an error. 

356 if getattr(self, "_cached_temp_folder", None) is not None: 

357 return self._cached_temp_folder 

358 else: 

359 self._cached_temp_folder = ( 

360 self._temp_folder_manager.resolve_temp_folder_name() 

361 ) # noqa 

362 return self._cached_temp_folder