Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/externals/loky/backend/queues.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

100 statements  

1############################################################################### 

2# Queue and SimpleQueue implementation for loky 

3# 

4# authors: Thomas Moreau, Olivier Grisel 

5# 

6# based on multiprocessing/queues.py (16/02/2017) 

7# * Add some custom reducers for the Queues/SimpleQueue to tweak the 

8# pickling process. (overload Queue._feed/SimpleQueue.put) 

9# 

10import os 

11import sys 

12import errno 

13import weakref 

14import threading 

15from multiprocessing import util 

16from multiprocessing.queues import ( 

17 Full, 

18 Queue as mp_Queue, 

19 SimpleQueue as mp_SimpleQueue, 

20 _sentinel, 

21) 

22from multiprocessing.context import assert_spawning 

23 

24from .reduction import dumps 

25 

26 

27__all__ = ["Queue", "SimpleQueue", "Full"] 

28 

29 

30class Queue(mp_Queue): 

31 def __init__(self, maxsize=0, reducers=None, ctx=None): 

32 super().__init__(maxsize=maxsize, ctx=ctx) 

33 self._reducers = reducers 

34 

35 # Use custom queue set/get state to be able to reduce the custom reducers 

36 def __getstate__(self): 

37 assert_spawning(self) 

38 return ( 

39 self._ignore_epipe, 

40 self._maxsize, 

41 self._reader, 

42 self._writer, 

43 self._reducers, 

44 self._rlock, 

45 self._wlock, 

46 self._sem, 

47 self._opid, 

48 ) 

49 

50 def __setstate__(self, state): 

51 ( 

52 self._ignore_epipe, 

53 self._maxsize, 

54 self._reader, 

55 self._writer, 

56 self._reducers, 

57 self._rlock, 

58 self._wlock, 

59 self._sem, 

60 self._opid, 

61 ) = state 

62 if sys.version_info >= (3, 9): 

63 self._reset() 

64 else: 

65 self._after_fork() 

66 

67 # Overload _start_thread to correctly call our custom _feed 

68 def _start_thread(self): 

69 util.debug("Queue._start_thread()") 

70 

71 # Start thread which transfers data from buffer to pipe 

72 self._buffer.clear() 

73 self._thread = threading.Thread( 

74 target=Queue._feed, 

75 args=( 

76 self._buffer, 

77 self._notempty, 

78 self._send_bytes, 

79 self._wlock, 

80 self._writer.close, 

81 self._reducers, 

82 self._ignore_epipe, 

83 self._on_queue_feeder_error, 

84 self._sem, 

85 ), 

86 name="QueueFeederThread", 

87 ) 

88 self._thread.daemon = True 

89 

90 util.debug("doing self._thread.start()") 

91 self._thread.start() 

92 util.debug("... done self._thread.start()") 

93 

94 # On process exit we will wait for data to be flushed to pipe. 

95 # 

96 # However, if this process created the queue then all 

97 # processes which use the queue will be descendants of this 

98 # process. Therefore waiting for the queue to be flushed 

99 # is pointless once all the child processes have been joined. 

100 created_by_this_process = self._opid == os.getpid() 

101 if not self._joincancelled and not created_by_this_process: 

102 self._jointhread = util.Finalize( 

103 self._thread, 

104 Queue._finalize_join, 

105 [weakref.ref(self._thread)], 

106 exitpriority=-5, 

107 ) 

108 

109 # Send sentinel to the thread queue object when garbage collected 

110 self._close = util.Finalize( 

111 self, 

112 Queue._finalize_close, 

113 [self._buffer, self._notempty], 

114 exitpriority=10, 

115 ) 

116 

117 # Overload the _feed methods to use our custom pickling strategy. 

118 @staticmethod 

119 def _feed( 

120 buffer, 

121 notempty, 

122 send_bytes, 

123 writelock, 

124 close, 

125 reducers, 

126 ignore_epipe, 

127 onerror, 

128 queue_sem, 

129 ): 

130 util.debug("starting thread to feed data to pipe") 

131 nacquire = notempty.acquire 

132 nrelease = notempty.release 

133 nwait = notempty.wait 

134 bpopleft = buffer.popleft 

135 sentinel = _sentinel 

136 if sys.platform != "win32": 

137 wacquire = writelock.acquire 

138 wrelease = writelock.release 

139 else: 

140 wacquire = None 

141 

142 while True: 

143 try: 

144 nacquire() 

145 try: 

146 if not buffer: 

147 nwait() 

148 finally: 

149 nrelease() 

150 try: 

151 while True: 

152 obj = bpopleft() 

153 if obj is sentinel: 

154 util.debug("feeder thread got sentinel -- exiting") 

155 close() 

156 return 

157 

158 # serialize the data before acquiring the lock 

159 obj_ = dumps(obj, reducers=reducers) 

160 if wacquire is None: 

161 send_bytes(obj_) 

162 else: 

163 wacquire() 

164 try: 

165 send_bytes(obj_) 

166 finally: 

167 wrelease() 

168 # Remove references early to avoid leaking memory 

169 del obj, obj_ 

170 except IndexError: 

171 pass 

172 except BaseException as e: 

173 if ignore_epipe and getattr(e, "errno", 0) == errno.EPIPE: 

174 return 

175 # Since this runs in a daemon thread the resources it uses 

176 # may be become unusable while the process is cleaning up. 

177 # We ignore errors which happen after the process has 

178 # started to cleanup. 

179 if util.is_exiting(): 

180 util.info(f"error in queue thread: {e}") 

181 return 

182 else: 

183 queue_sem.release() 

184 onerror(e, obj) 

185 

186 def _on_queue_feeder_error(self, e, obj): 

187 """ 

188 Private API hook called when feeding data in the background thread 

189 raises an exception. For overriding by concurrent.futures. 

190 """ 

191 import traceback 

192 

193 traceback.print_exc() 

194 

195 

196class SimpleQueue(mp_SimpleQueue): 

197 def __init__(self, reducers=None, ctx=None): 

198 super().__init__(ctx=ctx) 

199 

200 # Add possiblity to use custom reducers 

201 self._reducers = reducers 

202 

203 def close(self): 

204 self._reader.close() 

205 self._writer.close() 

206 

207 # Use custom queue set/get state to be able to reduce the custom reducers 

208 def __getstate__(self): 

209 assert_spawning(self) 

210 return ( 

211 self._reader, 

212 self._writer, 

213 self._reducers, 

214 self._rlock, 

215 self._wlock, 

216 ) 

217 

218 def __setstate__(self, state): 

219 ( 

220 self._reader, 

221 self._writer, 

222 self._reducers, 

223 self._rlock, 

224 self._wlock, 

225 ) = state 

226 

227 # Overload put to use our customizable reducer 

228 def put(self, obj): 

229 # serialize the data before acquiring the lock 

230 obj = dumps(obj, reducers=self._reducers) 

231 if self._wlock is None: 

232 # writes to a message oriented win32 pipe are atomic 

233 self._writer.send_bytes(obj) 

234 else: 

235 with self._wlock: 

236 self._writer.send_bytes(obj)