/src/libzmq/src/thread.cpp
Line | Count | Source |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include "macros.hpp" |
5 | | #include "thread.hpp" |
6 | | #include "err.hpp" |
7 | | |
8 | | #ifdef ZMQ_HAVE_WINDOWS |
9 | | #include <winnt.h> |
10 | | #endif |
11 | | |
12 | | #ifdef __MINGW32__ |
13 | | #include "pthread.h" |
14 | | #endif |
15 | | |
16 | | bool zmq::thread_t::get_started () const |
17 | 0 | { |
18 | 0 | return _started; |
19 | 0 | } |
20 | | |
21 | | #ifdef ZMQ_HAVE_WINDOWS |
22 | | |
23 | | extern "C" { |
24 | | #if defined _WIN32_WCE |
25 | | static DWORD thread_routine (LPVOID arg_) |
26 | | #else |
27 | | static unsigned int __stdcall thread_routine (void *arg_) |
28 | | #endif |
29 | | { |
30 | | zmq::thread_t *self = static_cast<zmq::thread_t *> (arg_); |
31 | | self->applyThreadName (); |
32 | | self->_tfn (self->_arg); |
33 | | return 0; |
34 | | } |
35 | | } |
36 | | |
37 | | void zmq::thread_t::start (thread_fn *tfn_, void *arg_, const char *name_) |
38 | | { |
39 | | _tfn = tfn_; |
40 | | _arg = arg_; |
41 | | if (name_) |
42 | | strncpy (_name, name_, sizeof (_name) - 1); |
43 | | |
44 | | // set default stack size to 4MB to avoid std::map stack overflow on x64 |
45 | | unsigned int stack = 0; |
46 | | #if defined _WIN64 |
47 | | stack = 0x400000; |
48 | | #endif |
49 | | |
50 | | #if defined _WIN32_WCE |
51 | | _descriptor = (HANDLE) CreateThread (NULL, stack, &::thread_routine, this, |
52 | | 0, &_thread_id); |
53 | | #else |
54 | | _descriptor = (HANDLE) _beginthreadex (NULL, stack, &::thread_routine, this, |
55 | | 0, &_thread_id); |
56 | | #endif |
57 | | win_assert (_descriptor != NULL); |
58 | | _started = true; |
59 | | } |
60 | | |
61 | | bool zmq::thread_t::is_current_thread () const |
62 | | { |
63 | | return GetCurrentThreadId () == _thread_id; |
64 | | } |
65 | | |
66 | | void zmq::thread_t::stop () |
67 | | { |
68 | | if (_started) { |
69 | | const DWORD rc = WaitForSingleObject (_descriptor, INFINITE); |
70 | | win_assert (rc != WAIT_FAILED); |
71 | | const BOOL rc2 = CloseHandle (_descriptor); |
72 | | win_assert (rc2 != 0); |
73 | | } |
74 | | } |
75 | | |
76 | | void zmq::thread_t::setSchedulingParameters ( |
77 | | int priority_, int scheduling_policy_, const std::set<int> &affinity_cpus_) |
78 | | { |
79 | | // not implemented |
80 | | LIBZMQ_UNUSED (priority_); |
81 | | LIBZMQ_UNUSED (scheduling_policy_); |
82 | | LIBZMQ_UNUSED (affinity_cpus_); |
83 | | } |
84 | | |
85 | | void zmq::thread_t:: |
86 | | applySchedulingParameters () // to be called in secondary thread context |
87 | | { |
88 | | // not implemented |
89 | | } |
90 | | |
91 | | #ifdef _MSC_VER |
92 | | |
93 | | namespace |
94 | | { |
95 | | #pragma pack(push, 8) |
96 | | struct thread_info_t |
97 | | { |
98 | | DWORD _type; |
99 | | LPCSTR _name; |
100 | | DWORD _thread_id; |
101 | | DWORD _flags; |
102 | | }; |
103 | | #pragma pack(pop) |
104 | | } |
105 | | |
106 | | #endif |
107 | | |
108 | | void zmq::thread_t:: |
109 | | applyThreadName () // to be called in secondary thread context |
110 | | { |
111 | | if (!_name[0] || !IsDebuggerPresent ()) |
112 | | return; |
113 | | |
114 | | #ifdef _MSC_VER |
115 | | |
116 | | thread_info_t thread_info; |
117 | | thread_info._type = 0x1000; |
118 | | thread_info._name = _name; |
119 | | thread_info._thread_id = -1; |
120 | | thread_info._flags = 0; |
121 | | |
122 | | __try { |
123 | | const DWORD MS_VC_EXCEPTION = 0x406D1388; |
124 | | RaiseException (MS_VC_EXCEPTION, 0, |
125 | | sizeof (thread_info) / sizeof (ULONG_PTR), |
126 | | (ULONG_PTR *) &thread_info); |
127 | | } |
128 | | __except (EXCEPTION_CONTINUE_EXECUTION) { |
129 | | } |
130 | | |
131 | | #elif defined(__MINGW32__) |
132 | | |
133 | | int rc = pthread_setname_np (pthread_self (), _name); |
134 | | if (rc) |
135 | | return; |
136 | | |
137 | | #else |
138 | | |
139 | | // not implemented |
140 | | |
141 | | #endif |
142 | | } |
143 | | |
144 | | |
145 | | #elif defined ZMQ_HAVE_VXWORKS |
146 | | |
147 | | extern "C" { |
148 | | static void *thread_routine (void *arg_) |
149 | | { |
150 | | zmq::thread_t *self = (zmq::thread_t *) arg_; |
151 | | self->applySchedulingParameters (); |
152 | | self->_tfn (self->_arg); |
153 | | return NULL; |
154 | | } |
155 | | } |
156 | | |
157 | | void zmq::thread_t::start (thread_fn *tfn_, void *arg_, const char *name_) |
158 | | { |
159 | | LIBZMQ_UNUSED (name_); |
160 | | _tfn = tfn_; |
161 | | _arg = arg_; |
162 | | _descriptor = taskSpawn (NULL, DEFAULT_PRIORITY, DEFAULT_OPTIONS, |
163 | | DEFAULT_STACK_SIZE, (FUNCPTR) thread_routine, |
164 | | (int) this, 0, 0, 0, 0, 0, 0, 0, 0, 0); |
165 | | if (_descriptor != NULL || _descriptor > 0) |
166 | | _started = true; |
167 | | } |
168 | | |
169 | | void zmq::thread_t::stop () |
170 | | { |
171 | | if (_started) |
172 | | while ((_descriptor != NULL || _descriptor > 0) |
173 | | && taskIdVerify (_descriptor) == 0) { |
174 | | } |
175 | | } |
176 | | |
177 | | bool zmq::thread_t::is_current_thread () const |
178 | | { |
179 | | return taskIdSelf () == _descriptor; |
180 | | } |
181 | | |
182 | | void zmq::thread_t::setSchedulingParameters ( |
183 | | int priority_, int schedulingPolicy_, const std::set<int> &affinity_cpus_) |
184 | | { |
185 | | _thread_priority = priority_; |
186 | | _thread_sched_policy = schedulingPolicy_; |
187 | | _thread_affinity_cpus = affinity_cpus_; |
188 | | } |
189 | | |
190 | | void zmq::thread_t:: |
191 | | applySchedulingParameters () // to be called in secondary thread context |
192 | | { |
193 | | int priority = |
194 | | (_thread_priority >= 0 ? _thread_priority : DEFAULT_PRIORITY); |
195 | | priority = (priority < UCHAR_MAX ? priority : DEFAULT_PRIORITY); |
196 | | if (_descriptor != NULL || _descriptor > 0) { |
197 | | taskPrioritySet (_descriptor, priority); |
198 | | } |
199 | | } |
200 | | |
201 | | void zmq::thread_t:: |
202 | | applyThreadName () // to be called in secondary thread context |
203 | | { |
204 | | // not implemented |
205 | | } |
206 | | |
207 | | #else |
208 | | |
209 | | #include <signal.h> |
210 | | #include <unistd.h> |
211 | | #include <sys/time.h> |
212 | | #include <sys/resource.h> |
213 | | |
214 | | extern "C" { |
215 | | static void *thread_routine (void *arg_) |
216 | 0 | { |
217 | 0 | #if !defined ZMQ_HAVE_OPENVMS && !defined ZMQ_HAVE_ANDROID |
218 | | // Following code will guarantee more predictable latencies as it'll |
219 | | // disallow any signal handling in the I/O thread. |
220 | 0 | sigset_t signal_set; |
221 | 0 | int rc = sigfillset (&signal_set); |
222 | 0 | errno_assert (rc == 0); |
223 | 0 | rc = pthread_sigmask (SIG_BLOCK, &signal_set, NULL); |
224 | 0 | posix_assert (rc); |
225 | 0 | #endif |
226 | 0 | zmq::thread_t *self = (zmq::thread_t *) arg_; |
227 | 0 | self->applySchedulingParameters (); |
228 | 0 | self->applyThreadName (); |
229 | 0 | self->_tfn (self->_arg); |
230 | 0 | return NULL; |
231 | 0 | } |
232 | | } |
233 | | |
234 | | void zmq::thread_t::start (thread_fn *tfn_, void *arg_, const char *name_) |
235 | 0 | { |
236 | 0 | _tfn = tfn_; |
237 | 0 | _arg = arg_; |
238 | 0 | if (name_) |
239 | 0 | strncpy (_name, name_, sizeof (_name) - 1); |
240 | 0 | int rc = pthread_create (&_descriptor, NULL, thread_routine, this); |
241 | 0 | posix_assert (rc); |
242 | 0 | _started = true; |
243 | 0 | } |
244 | | |
245 | | void zmq::thread_t::stop () |
246 | 0 | { |
247 | 0 | if (_started) { |
248 | 0 | int rc = pthread_join (_descriptor, NULL); |
249 | 0 | posix_assert (rc); |
250 | 0 | } |
251 | 0 | } |
252 | | |
253 | | bool zmq::thread_t::is_current_thread () const |
254 | 0 | { |
255 | 0 | return bool (pthread_equal (pthread_self (), _descriptor)); |
256 | 0 | } |
257 | | |
258 | | void zmq::thread_t::setSchedulingParameters ( |
259 | | int priority_, int scheduling_policy_, const std::set<int> &affinity_cpus_) |
260 | 0 | { |
261 | 0 | _thread_priority = priority_; |
262 | 0 | _thread_sched_policy = scheduling_policy_; |
263 | 0 | _thread_affinity_cpus = affinity_cpus_; |
264 | 0 | } |
265 | | |
266 | | void zmq::thread_t:: |
267 | | applySchedulingParameters () // to be called in secondary thread context |
268 | 0 | { |
269 | 0 | #if defined _POSIX_THREAD_PRIORITY_SCHEDULING \ |
270 | 0 | && _POSIX_THREAD_PRIORITY_SCHEDULING >= 0 |
271 | 0 | int policy = 0; |
272 | 0 | struct sched_param param; |
273 | |
|
274 | | #if _POSIX_THREAD_PRIORITY_SCHEDULING == 0 \ |
275 | | && defined _SC_THREAD_PRIORITY_SCHEDULING |
276 | | if (sysconf (_SC_THREAD_PRIORITY_SCHEDULING) < 0) { |
277 | | return; |
278 | | } |
279 | | #endif |
280 | 0 | int rc = pthread_getschedparam (pthread_self (), &policy, ¶m); |
281 | 0 | posix_assert (rc); |
282 | |
|
283 | 0 | if (_thread_sched_policy != ZMQ_THREAD_SCHED_POLICY_DFLT) { |
284 | 0 | policy = _thread_sched_policy; |
285 | 0 | } |
286 | | |
287 | | /* Quoting docs: |
288 | | "Linux allows the static priority range 1 to 99 for the SCHED_FIFO and |
289 | | SCHED_RR policies, and the priority 0 for the remaining policies." |
290 | | Other policies may use the "nice value" in place of the priority: |
291 | | */ |
292 | 0 | bool use_nice_instead_priority = |
293 | 0 | (policy != SCHED_FIFO) && (policy != SCHED_RR); |
294 | |
|
295 | 0 | if (use_nice_instead_priority) |
296 | 0 | param.sched_priority = |
297 | 0 | 0; // this is the only supported priority for most scheduling policies |
298 | 0 | else if (_thread_priority != ZMQ_THREAD_PRIORITY_DFLT) |
299 | 0 | param.sched_priority = |
300 | 0 | _thread_priority; // user should provide a value between 1 and 99 |
301 | |
|
302 | | #ifdef __NetBSD__ |
303 | | if (policy == SCHED_OTHER) |
304 | | param.sched_priority = -1; |
305 | | #endif |
306 | |
|
307 | 0 | rc = pthread_setschedparam (pthread_self (), policy, ¶m); |
308 | |
|
309 | | #if defined(__FreeBSD_kernel__) || defined(__FreeBSD__) |
310 | | // If this feature is unavailable at run-time, don't abort. |
311 | | if (rc == ENOSYS) |
312 | | return; |
313 | | #endif |
314 | |
|
315 | 0 | posix_assert (rc); |
316 | |
|
317 | 0 | #if !defined ZMQ_HAVE_VXWORKS |
318 | 0 | if (use_nice_instead_priority |
319 | 0 | && _thread_priority != ZMQ_THREAD_PRIORITY_DFLT |
320 | 0 | && _thread_priority > 0) { |
321 | | // assume the user wants to decrease the thread's nice value |
322 | | // i.e., increase the chance of this thread being scheduled: try setting that to |
323 | | // maximum priority. |
324 | 0 | rc = nice (-20); |
325 | |
|
326 | 0 | errno_assert (rc != -1); |
327 | | // IMPORTANT: EPERM is typically returned for unprivileged processes: that's because |
328 | | // CAP_SYS_NICE capability is required or RLIMIT_NICE resource limit should be changed to avoid EPERM! |
329 | 0 | } |
330 | 0 | #endif |
331 | |
|
332 | 0 | #ifdef ZMQ_HAVE_PTHREAD_SET_AFFINITY |
333 | 0 | if (!_thread_affinity_cpus.empty ()) { |
334 | 0 | cpu_set_t cpuset; |
335 | 0 | CPU_ZERO (&cpuset); |
336 | 0 | for (std::set<int>::const_iterator it = _thread_affinity_cpus.begin (), |
337 | 0 | end = _thread_affinity_cpus.end (); |
338 | 0 | it != end; it++) { |
339 | 0 | CPU_SET ((int) (*it), &cpuset); |
340 | 0 | } |
341 | 0 | rc = |
342 | 0 | pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset); |
343 | 0 | posix_assert (rc); |
344 | 0 | } |
345 | 0 | #endif |
346 | 0 | #endif |
347 | 0 | } |
348 | | |
349 | | void zmq::thread_t:: |
350 | | applyThreadName () // to be called in secondary thread context |
351 | 0 | { |
352 | | /* The thread name is a cosmetic string, added to ease debugging of |
353 | | * multi-threaded applications. It is not a big issue if this value |
354 | | * can not be set for any reason (such as Permission denied in some |
355 | | * cases where the application changes its EUID, etc.) The value of |
356 | | * "int rc" is retained where available, to help debuggers stepping |
357 | | * through code to see its value - but otherwise it is ignored. |
358 | | */ |
359 | 0 | if (!_name[0]) |
360 | 0 | return; |
361 | | |
362 | | /* Fails with permission denied on Android 5/6 */ |
363 | | #if defined(ZMQ_HAVE_ANDROID) |
364 | | return; |
365 | | #endif |
366 | | |
367 | | #if defined(ZMQ_HAVE_PTHREAD_SETNAME_1) |
368 | | int rc = pthread_setname_np (_name); |
369 | | if (rc) |
370 | | return; |
371 | | #elif defined(ZMQ_HAVE_PTHREAD_SETNAME_2) |
372 | 0 | int rc = pthread_setname_np (pthread_self (), _name); |
373 | 0 | if (rc) |
374 | 0 | return; |
375 | | #elif defined(ZMQ_HAVE_PTHREAD_SETNAME_3) |
376 | | int rc = pthread_setname_np (pthread_self (), _name, NULL); |
377 | | if (rc) |
378 | | return; |
379 | | #elif defined(ZMQ_HAVE_PTHREAD_SET_NAME) |
380 | | pthread_set_name_np (pthread_self (), _name); |
381 | | #endif |
382 | 0 | } |
383 | | |
384 | | #endif |