Coverage Report

Created: 2026-01-25 06:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/thread.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "macros.hpp"
5
#include "thread.hpp"
6
#include "err.hpp"
7
8
#ifdef ZMQ_HAVE_WINDOWS
9
#include <winnt.h>
10
#endif
11
12
#ifdef __MINGW32__
13
#include "pthread.h"
14
#endif
15
16
bool zmq::thread_t::get_started () const
17
0
{
18
0
    return _started;
19
0
}
20
21
#ifdef ZMQ_HAVE_WINDOWS
22
23
extern "C" {
24
#if defined _WIN32_WCE
25
static DWORD thread_routine (LPVOID arg_)
26
#else
27
static unsigned int __stdcall thread_routine (void *arg_)
28
#endif
29
{
30
    zmq::thread_t *self = static_cast<zmq::thread_t *> (arg_);
31
    self->applyThreadName ();
32
    self->_tfn (self->_arg);
33
    return 0;
34
}
35
}
36
37
void zmq::thread_t::start (thread_fn *tfn_, void *arg_, const char *name_)
38
{
39
    _tfn = tfn_;
40
    _arg = arg_;
41
    if (name_)
42
        strncpy (_name, name_, sizeof (_name) - 1);
43
44
    // set default stack size to 4MB to avoid std::map stack overflow on x64
45
    unsigned int stack = 0;
46
#if defined _WIN64
47
    stack = 0x400000;
48
#endif
49
50
#if defined _WIN32_WCE
51
    _descriptor = (HANDLE) CreateThread (NULL, stack, &::thread_routine, this,
52
                                         0, &_thread_id);
53
#else
54
    _descriptor = (HANDLE) _beginthreadex (NULL, stack, &::thread_routine, this,
55
                                           0, &_thread_id);
56
#endif
57
    win_assert (_descriptor != NULL);
58
    _started = true;
59
}
60
61
bool zmq::thread_t::is_current_thread () const
62
{
63
    return GetCurrentThreadId () == _thread_id;
64
}
65
66
void zmq::thread_t::stop ()
67
{
68
    if (_started) {
69
        const DWORD rc = WaitForSingleObject (_descriptor, INFINITE);
70
        win_assert (rc != WAIT_FAILED);
71
        const BOOL rc2 = CloseHandle (_descriptor);
72
        win_assert (rc2 != 0);
73
    }
74
}
75
76
void zmq::thread_t::setSchedulingParameters (
77
  int priority_, int scheduling_policy_, const std::set<int> &affinity_cpus_)
78
{
79
    // not implemented
80
    LIBZMQ_UNUSED (priority_);
81
    LIBZMQ_UNUSED (scheduling_policy_);
82
    LIBZMQ_UNUSED (affinity_cpus_);
83
}
84
85
void zmq::thread_t::
86
  applySchedulingParameters () // to be called in secondary thread context
87
{
88
    // not implemented
89
}
90
91
#ifdef _MSC_VER
92
93
namespace
94
{
95
#pragma pack(push, 8)
96
struct thread_info_t
97
{
98
    DWORD _type;
99
    LPCSTR _name;
100
    DWORD _thread_id;
101
    DWORD _flags;
102
};
103
#pragma pack(pop)
104
}
105
106
#endif
107
108
void zmq::thread_t::
109
  applyThreadName () // to be called in secondary thread context
110
{
111
    if (!_name[0] || !IsDebuggerPresent ())
112
        return;
113
114
#ifdef _MSC_VER
115
116
    thread_info_t thread_info;
117
    thread_info._type = 0x1000;
118
    thread_info._name = _name;
119
    thread_info._thread_id = -1;
120
    thread_info._flags = 0;
121
122
    __try {
123
        const DWORD MS_VC_EXCEPTION = 0x406D1388;
124
        RaiseException (MS_VC_EXCEPTION, 0,
125
                        sizeof (thread_info) / sizeof (ULONG_PTR),
126
                        (ULONG_PTR *) &thread_info);
127
    }
128
    __except (EXCEPTION_CONTINUE_EXECUTION) {
129
    }
130
131
#elif defined(__MINGW32__)
132
133
    int rc = pthread_setname_np (pthread_self (), _name);
134
    if (rc)
135
        return;
136
137
#else
138
139
        // not implemented
140
141
#endif
142
}
143
144
145
#elif defined ZMQ_HAVE_VXWORKS
146
147
extern "C" {
148
static void *thread_routine (void *arg_)
149
{
150
    zmq::thread_t *self = (zmq::thread_t *) arg_;
151
    self->applySchedulingParameters ();
152
    self->_tfn (self->_arg);
153
    return NULL;
154
}
155
}
156
157
void zmq::thread_t::start (thread_fn *tfn_, void *arg_, const char *name_)
158
{
159
    LIBZMQ_UNUSED (name_);
160
    _tfn = tfn_;
161
    _arg = arg_;
162
    _descriptor = taskSpawn (NULL, DEFAULT_PRIORITY, DEFAULT_OPTIONS,
163
                             DEFAULT_STACK_SIZE, (FUNCPTR) thread_routine,
164
                             (int) this, 0, 0, 0, 0, 0, 0, 0, 0, 0);
165
    if (_descriptor != NULL || _descriptor > 0)
166
        _started = true;
167
}
168
169
void zmq::thread_t::stop ()
170
{
171
    if (_started)
172
        while ((_descriptor != NULL || _descriptor > 0)
173
               && taskIdVerify (_descriptor) == 0) {
174
        }
175
}
176
177
bool zmq::thread_t::is_current_thread () const
178
{
179
    return taskIdSelf () == _descriptor;
180
}
181
182
void zmq::thread_t::setSchedulingParameters (
183
  int priority_, int schedulingPolicy_, const std::set<int> &affinity_cpus_)
184
{
185
    _thread_priority = priority_;
186
    _thread_sched_policy = schedulingPolicy_;
187
    _thread_affinity_cpus = affinity_cpus_;
188
}
189
190
void zmq::thread_t::
191
  applySchedulingParameters () // to be called in secondary thread context
192
{
193
    int priority =
194
      (_thread_priority >= 0 ? _thread_priority : DEFAULT_PRIORITY);
195
    priority = (priority < UCHAR_MAX ? priority : DEFAULT_PRIORITY);
196
    if (_descriptor != NULL || _descriptor > 0) {
197
        taskPrioritySet (_descriptor, priority);
198
    }
199
}
200
201
void zmq::thread_t::
202
  applyThreadName () // to be called in secondary thread context
203
{
204
    // not implemented
205
}
206
207
#else
208
209
#include <signal.h>
210
#include <unistd.h>
211
#include <sys/time.h>
212
#include <sys/resource.h>
213
214
extern "C" {
215
static void *thread_routine (void *arg_)
216
0
{
217
0
#if !defined ZMQ_HAVE_OPENVMS && !defined ZMQ_HAVE_ANDROID
218
    //  Following code will guarantee more predictable latencies as it'll
219
    //  disallow any signal handling in the I/O thread.
220
0
    sigset_t signal_set;
221
0
    int rc = sigfillset (&signal_set);
222
0
    errno_assert (rc == 0);
223
0
    rc = pthread_sigmask (SIG_BLOCK, &signal_set, NULL);
224
0
    posix_assert (rc);
225
0
#endif
226
0
    zmq::thread_t *self = (zmq::thread_t *) arg_;
227
0
    self->applySchedulingParameters ();
228
0
    self->applyThreadName ();
229
0
    self->_tfn (self->_arg);
230
0
    return NULL;
231
0
}
232
}
233
234
void zmq::thread_t::start (thread_fn *tfn_, void *arg_, const char *name_)
235
0
{
236
0
    _tfn = tfn_;
237
0
    _arg = arg_;
238
0
    if (name_)
239
0
        strncpy (_name, name_, sizeof (_name) - 1);
240
0
    int rc = pthread_create (&_descriptor, NULL, thread_routine, this);
241
0
    posix_assert (rc);
242
0
    _started = true;
243
0
}
244
245
void zmq::thread_t::stop ()
246
0
{
247
0
    if (_started) {
248
0
        int rc = pthread_join (_descriptor, NULL);
249
0
        posix_assert (rc);
250
0
    }
251
0
}
252
253
bool zmq::thread_t::is_current_thread () const
254
0
{
255
0
    return bool (pthread_equal (pthread_self (), _descriptor));
256
0
}
257
258
void zmq::thread_t::setSchedulingParameters (
259
  int priority_, int scheduling_policy_, const std::set<int> &affinity_cpus_)
260
0
{
261
0
    _thread_priority = priority_;
262
0
    _thread_sched_policy = scheduling_policy_;
263
0
    _thread_affinity_cpus = affinity_cpus_;
264
0
}
265
266
void zmq::thread_t::
267
  applySchedulingParameters () // to be called in secondary thread context
268
0
{
269
0
#if defined _POSIX_THREAD_PRIORITY_SCHEDULING                                  \
270
0
  && _POSIX_THREAD_PRIORITY_SCHEDULING >= 0
271
0
    int policy = 0;
272
0
    struct sched_param param;
273
274
#if _POSIX_THREAD_PRIORITY_SCHEDULING == 0                                     \
275
  && defined _SC_THREAD_PRIORITY_SCHEDULING
276
    if (sysconf (_SC_THREAD_PRIORITY_SCHEDULING) < 0) {
277
        return;
278
    }
279
#endif
280
0
    int rc = pthread_getschedparam (pthread_self (), &policy, &param);
281
0
    posix_assert (rc);
282
283
0
    if (_thread_sched_policy != ZMQ_THREAD_SCHED_POLICY_DFLT) {
284
0
        policy = _thread_sched_policy;
285
0
    }
286
287
    /* Quoting docs:
288
       "Linux allows the static priority range 1 to 99 for the SCHED_FIFO and
289
       SCHED_RR policies, and the priority 0 for the remaining policies."
290
       Other policies may use the "nice value" in place of the priority:
291
    */
292
0
    bool use_nice_instead_priority =
293
0
      (policy != SCHED_FIFO) && (policy != SCHED_RR);
294
295
0
    if (use_nice_instead_priority)
296
0
        param.sched_priority =
297
0
          0; // this is the only supported priority for most scheduling policies
298
0
    else if (_thread_priority != ZMQ_THREAD_PRIORITY_DFLT)
299
0
        param.sched_priority =
300
0
          _thread_priority; // user should provide a value between 1 and 99
301
302
#ifdef __NetBSD__
303
    if (policy == SCHED_OTHER)
304
        param.sched_priority = -1;
305
#endif
306
307
0
    rc = pthread_setschedparam (pthread_self (), policy, &param);
308
309
#if defined(__FreeBSD_kernel__) || defined(__FreeBSD__)
310
    // If this feature is unavailable at run-time, don't abort.
311
    if (rc == ENOSYS)
312
        return;
313
#endif
314
315
0
    posix_assert (rc);
316
317
0
#if !defined ZMQ_HAVE_VXWORKS
318
0
    if (use_nice_instead_priority
319
0
        && _thread_priority != ZMQ_THREAD_PRIORITY_DFLT
320
0
        && _thread_priority > 0) {
321
        // assume the user wants to decrease the thread's nice value
322
        // i.e., increase the chance of this thread being scheduled: try setting that to
323
        // maximum priority.
324
0
        rc = nice (-20);
325
326
0
        errno_assert (rc != -1);
327
        // IMPORTANT: EPERM is typically returned for unprivileged processes: that's because
328
        //            CAP_SYS_NICE capability is required or RLIMIT_NICE resource limit should be changed to avoid EPERM!
329
0
    }
330
0
#endif
331
332
0
#ifdef ZMQ_HAVE_PTHREAD_SET_AFFINITY
333
0
    if (!_thread_affinity_cpus.empty ()) {
334
0
        cpu_set_t cpuset;
335
0
        CPU_ZERO (&cpuset);
336
0
        for (std::set<int>::const_iterator it = _thread_affinity_cpus.begin (),
337
0
                                           end = _thread_affinity_cpus.end ();
338
0
             it != end; it++) {
339
0
            CPU_SET ((int) (*it), &cpuset);
340
0
        }
341
0
        rc =
342
0
          pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
343
0
        posix_assert (rc);
344
0
    }
345
0
#endif
346
0
#endif
347
0
}
348
349
void zmq::thread_t::
350
  applyThreadName () // to be called in secondary thread context
351
0
{
352
    /* The thread name is a cosmetic string, added to ease debugging of
353
 * multi-threaded applications. It is not a big issue if this value
354
 * can not be set for any reason (such as Permission denied in some
355
 * cases where the application changes its EUID, etc.) The value of
356
 * "int rc" is retained where available, to help debuggers stepping
357
 * through code to see its value - but otherwise it is ignored.
358
 */
359
0
    if (!_name[0])
360
0
        return;
361
362
        /* Fails with permission denied on Android 5/6 */
363
#if defined(ZMQ_HAVE_ANDROID)
364
    return;
365
#endif
366
367
#if defined(ZMQ_HAVE_PTHREAD_SETNAME_1)
368
    int rc = pthread_setname_np (_name);
369
    if (rc)
370
        return;
371
#elif defined(ZMQ_HAVE_PTHREAD_SETNAME_2)
372
0
    int rc = pthread_setname_np (pthread_self (), _name);
373
0
    if (rc)
374
0
        return;
375
#elif defined(ZMQ_HAVE_PTHREAD_SETNAME_3)
376
    int rc = pthread_setname_np (pthread_self (), _name, NULL);
377
    if (rc)
378
        return;
379
#elif defined(ZMQ_HAVE_PTHREAD_SET_NAME)
380
    pthread_set_name_np (pthread_self (), _name);
381
#endif
382
0
}
383
384
#endif