Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/pool.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

123 statements  

1"""Custom implementation of multiprocessing.Pool with custom pickler. 

2 

3This module provides efficient ways of working with data stored in 

4shared memory with numpy.memmap arrays without inducing any memory 

5copy between the parent and child processes. 

6 

7This module should not be imported if multiprocessing is not 

8available as it implements subclasses of multiprocessing Pool 

9that uses a custom alternative to SimpleQueue. 

10 

11""" 

12# Author: Olivier Grisel <olivier.grisel@ensta.org> 

13# Copyright: 2012, Olivier Grisel 

14# License: BSD 3 clause 

15 

16import copyreg 

17import sys 

18import warnings 

19from time import sleep 

20 

21try: 

22 WindowsError 

23except NameError: 

24 WindowsError = type(None) 

25 

26from pickle import Pickler 

27 

28from pickle import HIGHEST_PROTOCOL 

29from io import BytesIO 

30 

31from ._memmapping_reducer import get_memmapping_reducers 

32from ._memmapping_reducer import TemporaryResourcesManager 

33from ._multiprocessing_helpers import mp, assert_spawning 

34 

35# We need the class definition to derive from it, not the multiprocessing.Pool 

36# factory function 

37from multiprocessing.pool import Pool 

38 

39try: 

40 import numpy as np 

41except ImportError: 

42 np = None 

43 

44 

45############################################################################### 

46# Enable custom pickling in Pool queues 

47 

48class CustomizablePickler(Pickler): 

49 """Pickler that accepts custom reducers. 

50 

51 TODO python2_drop : can this be simplified ? 

52 

53 HIGHEST_PROTOCOL is selected by default as this pickler is used 

54 to pickle ephemeral datastructures for interprocess communication 

55 hence no backward compatibility is required. 

56 

57 `reducers` is expected to be a dictionary with key/values 

58 being `(type, callable)` pairs where `callable` is a function that 

59 give an instance of `type` will return a tuple `(constructor, 

60 tuple_of_objects)` to rebuild an instance out of the pickled 

61 `tuple_of_objects` as would return a `__reduce__` method. See the 

62 standard library documentation on pickling for more details. 

63 

64 """ 

65 

66 # We override the pure Python pickler as its the only way to be able to 

67 # customize the dispatch table without side effects in Python 2.7 

68 # to 3.2. For Python 3.3+ leverage the new dispatch_table 

69 # feature from https://bugs.python.org/issue14166 that makes it possible 

70 # to use the C implementation of the Pickler which is faster. 

71 

72 def __init__(self, writer, reducers=None, protocol=HIGHEST_PROTOCOL): 

73 Pickler.__init__(self, writer, protocol=protocol) 

74 if reducers is None: 

75 reducers = {} 

76 if hasattr(Pickler, 'dispatch'): 

77 # Make the dispatch registry an instance level attribute instead of 

78 # a reference to the class dictionary under Python 2 

79 self.dispatch = Pickler.dispatch.copy() 

80 else: 

81 # Under Python 3 initialize the dispatch table with a copy of the 

82 # default registry 

83 self.dispatch_table = copyreg.dispatch_table.copy() 

84 for type, reduce_func in reducers.items(): 

85 self.register(type, reduce_func) 

86 

87 def register(self, type, reduce_func): 

88 """Attach a reducer function to a given type in the dispatch table.""" 

89 if hasattr(Pickler, 'dispatch'): 

90 # Python 2 pickler dispatching is not explicitly customizable. 

91 # Let us use a closure to workaround this limitation. 

92 def dispatcher(self, obj): 

93 reduced = reduce_func(obj) 

94 self.save_reduce(obj=obj, *reduced) 

95 self.dispatch[type] = dispatcher 

96 else: 

97 self.dispatch_table[type] = reduce_func 

98 

99 

100class CustomizablePicklingQueue(object): 

101 """Locked Pipe implementation that uses a customizable pickler. 

102 

103 This class is an alternative to the multiprocessing implementation 

104 of SimpleQueue in order to make it possible to pass custom 

105 pickling reducers, for instance to avoid memory copy when passing 

106 memory mapped datastructures. 

107 

108 `reducers` is expected to be a dict with key / values being 

109 `(type, callable)` pairs where `callable` is a function that, given an 

110 instance of `type`, will return a tuple `(constructor, tuple_of_objects)` 

111 to rebuild an instance out of the pickled `tuple_of_objects` as would 

112 return a `__reduce__` method. 

113 

114 See the standard library documentation on pickling for more details. 

115 """ 

116 

117 def __init__(self, context, reducers=None): 

118 self._reducers = reducers 

119 self._reader, self._writer = context.Pipe(duplex=False) 

120 self._rlock = context.Lock() 

121 if sys.platform == 'win32': 

122 self._wlock = None 

123 else: 

124 self._wlock = context.Lock() 

125 self._make_methods() 

126 

127 def __getstate__(self): 

128 assert_spawning(self) 

129 return (self._reader, self._writer, self._rlock, self._wlock, 

130 self._reducers) 

131 

132 def __setstate__(self, state): 

133 (self._reader, self._writer, self._rlock, self._wlock, 

134 self._reducers) = state 

135 self._make_methods() 

136 

137 def empty(self): 

138 return not self._reader.poll() 

139 

140 def _make_methods(self): 

141 self._recv = recv = self._reader.recv 

142 racquire, rrelease = self._rlock.acquire, self._rlock.release 

143 

144 def get(): 

145 racquire() 

146 try: 

147 return recv() 

148 finally: 

149 rrelease() 

150 

151 self.get = get 

152 

153 if self._reducers: 

154 def send(obj): 

155 buffer = BytesIO() 

156 CustomizablePickler(buffer, self._reducers).dump(obj) 

157 self._writer.send_bytes(buffer.getvalue()) 

158 self._send = send 

159 else: 

160 self._send = send = self._writer.send 

161 if self._wlock is None: 

162 # writes to a message oriented win32 pipe are atomic 

163 self.put = send 

164 else: 

165 wlock_acquire, wlock_release = ( 

166 self._wlock.acquire, self._wlock.release) 

167 

168 def put(obj): 

169 wlock_acquire() 

170 try: 

171 return send(obj) 

172 finally: 

173 wlock_release() 

174 

175 self.put = put 

176 

177 

178class PicklingPool(Pool): 

179 """Pool implementation with customizable pickling reducers. 

180 

181 This is useful to control how data is shipped between processes 

182 and makes it possible to use shared memory without useless 

183 copies induces by the default pickling methods of the original 

184 objects passed as arguments to dispatch. 

185 

186 `forward_reducers` and `backward_reducers` are expected to be 

187 dictionaries with key/values being `(type, callable)` pairs where 

188 `callable` is a function that, given an instance of `type`, will return a 

189 tuple `(constructor, tuple_of_objects)` to rebuild an instance out of the 

190 pickled `tuple_of_objects` as would return a `__reduce__` method. 

191 See the standard library documentation about pickling for more details. 

192 

193 """ 

194 

195 def __init__(self, processes=None, forward_reducers=None, 

196 backward_reducers=None, **kwargs): 

197 if forward_reducers is None: 

198 forward_reducers = dict() 

199 if backward_reducers is None: 

200 backward_reducers = dict() 

201 self._forward_reducers = forward_reducers 

202 self._backward_reducers = backward_reducers 

203 poolargs = dict(processes=processes) 

204 poolargs.update(kwargs) 

205 super(PicklingPool, self).__init__(**poolargs) 

206 

207 def _setup_queues(self): 

208 context = getattr(self, '_ctx', mp) 

209 self._inqueue = CustomizablePicklingQueue(context, 

210 self._forward_reducers) 

211 self._outqueue = CustomizablePicklingQueue(context, 

212 self._backward_reducers) 

213 self._quick_put = self._inqueue._send 

214 self._quick_get = self._outqueue._recv 

215 

216 

217class MemmappingPool(PicklingPool): 

218 """Process pool that shares large arrays to avoid memory copy. 

219 

220 This drop-in replacement for `multiprocessing.pool.Pool` makes 

221 it possible to work efficiently with shared memory in a numpy 

222 context. 

223 

224 Existing instances of numpy.memmap are preserved: the child 

225 suprocesses will have access to the same shared memory in the 

226 original mode except for the 'w+' mode that is automatically 

227 transformed as 'r+' to avoid zeroing the original data upon 

228 instantiation. 

229 

230 Furthermore large arrays from the parent process are automatically 

231 dumped to a temporary folder on the filesystem such as child 

232 processes to access their content via memmapping (file system 

233 backed shared memory). 

234 

235 Note: it is important to call the terminate method to collect 

236 the temporary folder used by the pool. 

237 

238 Parameters 

239 ---------- 

240 processes: int, optional 

241 Number of worker processes running concurrently in the pool. 

242 initializer: callable, optional 

243 Callable executed on worker process creation. 

244 initargs: tuple, optional 

245 Arguments passed to the initializer callable. 

246 temp_folder: (str, callable) optional 

247 If str: 

248 Folder to be used by the pool for memmapping large arrays 

249 for sharing memory with worker processes. If None, this will try in 

250 order: 

251 - a folder pointed by the JOBLIB_TEMP_FOLDER environment variable, 

252 - /dev/shm if the folder exists and is writable: this is a RAMdisk 

253 filesystem available by default on modern Linux distributions, 

254 - the default system temporary folder that can be overridden 

255 with TMP, TMPDIR or TEMP environment variables, typically /tmp 

256 under Unix operating systems. 

257 if callable: 

258 An callable in charge of dynamically resolving a temporary folder 

259 for memmapping large arrays. 

260 max_nbytes int or None, optional, 1e6 by default 

261 Threshold on the size of arrays passed to the workers that 

262 triggers automated memory mapping in temp_folder. 

263 Use None to disable memmapping of large arrays. 

264 mmap_mode: {'r+', 'r', 'w+', 'c'} 

265 Memmapping mode for numpy arrays passed to workers. 

266 See 'max_nbytes' parameter documentation for more details. 

267 forward_reducers: dictionary, optional 

268 Reducers used to pickle objects passed from main process to worker 

269 processes: see below. 

270 backward_reducers: dictionary, optional 

271 Reducers used to pickle return values from workers back to the 

272 main process. 

273 verbose: int, optional 

274 Make it possible to monitor how the communication of numpy arrays 

275 with the subprocess is handled (pickling or memmapping) 

276 prewarm: bool or str, optional, "auto" by default. 

277 If True, force a read on newly memmapped array to make sure that OS 

278 pre-cache it in memory. This can be useful to avoid concurrent disk 

279 access when the same data array is passed to different worker 

280 processes. If "auto" (by default), prewarm is set to True, unless the 

281 Linux shared memory partition /dev/shm is available and used as temp 

282 folder. 

283 

284 `forward_reducers` and `backward_reducers` are expected to be 

285 dictionaries with key/values being `(type, callable)` pairs where 

286 `callable` is a function that give an instance of `type` will return 

287 a tuple `(constructor, tuple_of_objects)` to rebuild an instance out 

288 of the pickled `tuple_of_objects` as would return a `__reduce__` 

289 method. See the standard library documentation on pickling for more 

290 details. 

291 

292 """ 

293 

294 def __init__(self, processes=None, temp_folder=None, max_nbytes=1e6, 

295 mmap_mode='r', forward_reducers=None, backward_reducers=None, 

296 verbose=0, context_id=None, prewarm=False, **kwargs): 

297 

298 if context_id is not None: 

299 warnings.warn('context_id is deprecated and ignored in joblib' 

300 ' 0.9.4 and will be removed in 0.11', 

301 DeprecationWarning) 

302 

303 manager = TemporaryResourcesManager(temp_folder) 

304 self._temp_folder_manager = manager 

305 

306 # The usage of a temp_folder_resolver over a simple temp_folder is 

307 # superfluous for multiprocessing pools, as they don't get reused, see 

308 # get_memmapping_executor for more details. We still use it for code 

309 # simplicity. 

310 forward_reducers, backward_reducers = \ 

311 get_memmapping_reducers( 

312 temp_folder_resolver=manager.resolve_temp_folder_name, 

313 max_nbytes=max_nbytes, mmap_mode=mmap_mode, 

314 forward_reducers=forward_reducers, 

315 backward_reducers=backward_reducers, verbose=verbose, 

316 unlink_on_gc_collect=False, prewarm=prewarm) 

317 

318 poolargs = dict( 

319 processes=processes, 

320 forward_reducers=forward_reducers, 

321 backward_reducers=backward_reducers) 

322 poolargs.update(kwargs) 

323 super(MemmappingPool, self).__init__(**poolargs) 

324 

325 def terminate(self): 

326 n_retries = 10 

327 for i in range(n_retries): 

328 try: 

329 super(MemmappingPool, self).terminate() 

330 break 

331 except OSError as e: 

332 if isinstance(e, WindowsError): 

333 # Workaround occasional "[Error 5] Access is denied" issue 

334 # when trying to terminate a process under windows. 

335 sleep(0.1) 

336 if i + 1 == n_retries: 

337 warnings.warn("Failed to terminate worker processes in" 

338 " multiprocessing pool: %r" % e) 

339 

340 # Clean up the temporary resources as the workers should now be off. 

341 self._temp_folder_manager._clean_temporary_resources() 

342 

343 @property 

344 def _temp_folder(self): 

345 # Legacy property in tests. could be removed if we refactored the 

346 # memmapping tests. SHOULD ONLY BE USED IN TESTS! 

347 # We cache this property because it is called late in the tests - at 

348 # this point, all context have been unregistered, and 

349 # resolve_temp_folder_name raises an error. 

350 if getattr(self, '_cached_temp_folder', None) is not None: 

351 return self._cached_temp_folder 

352 else: 

353 self._cached_temp_folder = self._temp_folder_manager.resolve_temp_folder_name() # noqa 

354 return self._cached_temp_folder