1###############################################################################
2# Queue and SimpleQueue implementation for loky
3#
4# authors: Thomas Moreau, Olivier Grisel
5#
6# based on multiprocessing/queues.py (16/02/2017)
7# * Add some custom reducers for the Queues/SimpleQueue to tweak the
8# pickling process. (overload Queue._feed/SimpleQueue.put)
9#
10import os
11import sys
12import errno
13import weakref
14import threading
15from multiprocessing import util
16from multiprocessing.queues import (
17 Full,
18 Queue as mp_Queue,
19 SimpleQueue as mp_SimpleQueue,
20 _sentinel,
21)
22from multiprocessing.context import assert_spawning
23
24from .reduction import dumps
25
26
27__all__ = ["Queue", "SimpleQueue", "Full"]
28
29
30class Queue(mp_Queue):
31 def __init__(self, maxsize=0, reducers=None, ctx=None):
32 super().__init__(maxsize=maxsize, ctx=ctx)
33 self._reducers = reducers
34
35 # Use custom queue set/get state to be able to reduce the custom reducers
36 def __getstate__(self):
37 assert_spawning(self)
38 return (
39 self._ignore_epipe,
40 self._maxsize,
41 self._reader,
42 self._writer,
43 self._reducers,
44 self._rlock,
45 self._wlock,
46 self._sem,
47 self._opid,
48 )
49
50 def __setstate__(self, state):
51 (
52 self._ignore_epipe,
53 self._maxsize,
54 self._reader,
55 self._writer,
56 self._reducers,
57 self._rlock,
58 self._wlock,
59 self._sem,
60 self._opid,
61 ) = state
62 if sys.version_info >= (3, 9):
63 self._reset()
64 else:
65 self._after_fork()
66
67 # Overload _start_thread to correctly call our custom _feed
68 def _start_thread(self):
69 util.debug("Queue._start_thread()")
70
71 # Start thread which transfers data from buffer to pipe
72 self._buffer.clear()
73 self._thread = threading.Thread(
74 target=Queue._feed,
75 args=(
76 self._buffer,
77 self._notempty,
78 self._send_bytes,
79 self._wlock,
80 self._writer.close,
81 self._reducers,
82 self._ignore_epipe,
83 self._on_queue_feeder_error,
84 self._sem,
85 ),
86 name="QueueFeederThread",
87 )
88 self._thread.daemon = True
89
90 util.debug("doing self._thread.start()")
91 self._thread.start()
92 util.debug("... done self._thread.start()")
93
94 # On process exit we will wait for data to be flushed to pipe.
95 #
96 # However, if this process created the queue then all
97 # processes which use the queue will be descendants of this
98 # process. Therefore waiting for the queue to be flushed
99 # is pointless once all the child processes have been joined.
100 created_by_this_process = self._opid == os.getpid()
101 if not self._joincancelled and not created_by_this_process:
102 self._jointhread = util.Finalize(
103 self._thread,
104 Queue._finalize_join,
105 [weakref.ref(self._thread)],
106 exitpriority=-5,
107 )
108
109 # Send sentinel to the thread queue object when garbage collected
110 self._close = util.Finalize(
111 self,
112 Queue._finalize_close,
113 [self._buffer, self._notempty],
114 exitpriority=10,
115 )
116
117 # Overload the _feed methods to use our custom pickling strategy.
118 @staticmethod
119 def _feed(
120 buffer,
121 notempty,
122 send_bytes,
123 writelock,
124 close,
125 reducers,
126 ignore_epipe,
127 onerror,
128 queue_sem,
129 ):
130 util.debug("starting thread to feed data to pipe")
131 nacquire = notempty.acquire
132 nrelease = notempty.release
133 nwait = notempty.wait
134 bpopleft = buffer.popleft
135 sentinel = _sentinel
136 if sys.platform != "win32":
137 wacquire = writelock.acquire
138 wrelease = writelock.release
139 else:
140 wacquire = None
141
142 while True:
143 try:
144 nacquire()
145 try:
146 if not buffer:
147 nwait()
148 finally:
149 nrelease()
150 try:
151 while True:
152 obj = bpopleft()
153 if obj is sentinel:
154 util.debug("feeder thread got sentinel -- exiting")
155 close()
156 return
157
158 # serialize the data before acquiring the lock
159 obj_ = dumps(obj, reducers=reducers)
160 if wacquire is None:
161 send_bytes(obj_)
162 else:
163 wacquire()
164 try:
165 send_bytes(obj_)
166 finally:
167 wrelease()
168 # Remove references early to avoid leaking memory
169 del obj, obj_
170 except IndexError:
171 pass
172 except BaseException as e:
173 if ignore_epipe and getattr(e, "errno", 0) == errno.EPIPE:
174 return
175 # Since this runs in a daemon thread the resources it uses
176 # may be become unusable while the process is cleaning up.
177 # We ignore errors which happen after the process has
178 # started to cleanup.
179 if util.is_exiting():
180 util.info(f"error in queue thread: {e}")
181 return
182 else:
183 queue_sem.release()
184 onerror(e, obj)
185
186 def _on_queue_feeder_error(self, e, obj):
187 """
188 Private API hook called when feeding data in the background thread
189 raises an exception. For overriding by concurrent.futures.
190 """
191 import traceback
192
193 traceback.print_exc()
194
195
196class SimpleQueue(mp_SimpleQueue):
197 def __init__(self, reducers=None, ctx=None):
198 super().__init__(ctx=ctx)
199
200 # Add possiblity to use custom reducers
201 self._reducers = reducers
202
203 def close(self):
204 self._reader.close()
205 self._writer.close()
206
207 # Use custom queue set/get state to be able to reduce the custom reducers
208 def __getstate__(self):
209 assert_spawning(self)
210 return (
211 self._reader,
212 self._writer,
213 self._reducers,
214 self._rlock,
215 self._wlock,
216 )
217
218 def __setstate__(self, state):
219 (
220 self._reader,
221 self._writer,
222 self._reducers,
223 self._rlock,
224 self._wlock,
225 ) = state
226
227 # Overload put to use our customizable reducer
228 def put(self, obj):
229 # serialize the data before acquiring the lock
230 obj = dumps(obj, reducers=self._reducers)
231 if self._wlock is None:
232 # writes to a message oriented win32 pipe are atomic
233 self._writer.send_bytes(obj)
234 else:
235 with self._wlock:
236 self._writer.send_bytes(obj)