Coverage Report

Created: 2025-07-11 07:06

/src/open62541/arch/posix/eventloop_posix.c
Line
Count
Source (jump to first uncovered line)
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
 *
5
 *    Copyright 2021 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
6
 *    Copyright 2021 (c) Fraunhofer IOSB (Author: Jan Hermes)
7
 */
8
9
#include "eventloop_posix.h"
10
#include "open62541/plugin/eventloop.h"
11
12
#if defined(UA_ARCHITECTURE_POSIX) && !defined(UA_ARCHITECTURE_LWIP) || defined(UA_ARCHITECTURE_WIN32)
13
14
#if defined(UA_ARCHITECTURE_POSIX) && !defined(__APPLE__) && !defined(__MACH__)
15
#include <time.h>
16
#endif
17
18
/*********/
19
/* Timer */
20
/*********/
21
22
static UA_DateTime
23
999
UA_EventLoopPOSIX_nextTimer(UA_EventLoop *public_el) {
24
999
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
25
999
    if(el->delayedHead1 > (UA_DelayedCallback *)0x01 ||
26
999
       el->delayedHead2 > (UA_DelayedCallback *)0x01)
27
103
        return el->eventLoop.dateTime_nowMonotonic(&el->eventLoop);
28
896
    return UA_Timer_next(&el->timer);
29
999
}
30
31
static UA_StatusCode
32
UA_EventLoopPOSIX_addTimer(UA_EventLoop *public_el, UA_Callback cb,
33
                           void *application, void *data, UA_Double interval_ms,
34
                           UA_DateTime *baseTime, UA_TimerPolicy timerPolicy,
35
708
                           UA_UInt64 *callbackId) {
36
708
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
37
708
    return UA_Timer_add(&el->timer, cb, application, data, interval_ms,
38
708
                        public_el->dateTime_nowMonotonic(public_el),
39
708
                        baseTime, timerPolicy, callbackId);
40
708
}
41
42
static UA_StatusCode
43
UA_EventLoopPOSIX_modifyTimer(UA_EventLoop *public_el,
44
                              UA_UInt64 callbackId,
45
                              UA_Double interval_ms,
46
                              UA_DateTime *baseTime,
47
0
                              UA_TimerPolicy timerPolicy) {
48
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
49
0
    return UA_Timer_modify(&el->timer, callbackId, interval_ms,
50
0
                           public_el->dateTime_nowMonotonic(public_el),
51
0
                           baseTime, timerPolicy);
52
0
}
53
54
static void
55
UA_EventLoopPOSIX_removeTimer(UA_EventLoop *public_el,
56
708
                              UA_UInt64 callbackId) {
57
708
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
58
708
    UA_Timer_remove(&el->timer, callbackId);
59
708
}
60
61
void
62
UA_EventLoopPOSIX_addDelayedCallback(UA_EventLoop *public_el,
63
1.19k
                                     UA_DelayedCallback *dc) {
64
1.19k
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
65
1.19k
    dc->next = NULL;
66
67
    /* el->delayedTail points either to prev->next or to the head.
68
     * We need to update two locations:
69
     * 1: el->delayedTail = &dc->next;
70
     * 2: *oldtail = dc; (equal to &dc->next)
71
     *
72
     * Once we have (1), we "own" the previous-to-last entry. No need to worry
73
     * about (2), we can adjust it with a delay. This makes the queue
74
     * "eventually consistent". */
75
1.19k
    UA_DelayedCallback **oldtail = (UA_DelayedCallback**)
76
1.19k
        UA_atomic_xchg((void**)&el->delayedTail, &dc->next);
77
1.19k
    UA_atomic_xchg((void**)oldtail, &dc->next);
78
1.19k
}
79
80
/* Resets the delayed queue and returns the previous head and tail */
81
static void
82
resetDelayedQueue(UA_EventLoopPOSIX *el, UA_DelayedCallback **oldHead,
83
1.69k
                  UA_DelayedCallback **oldTail) {
84
1.69k
    if(el->delayedHead1 <= (UA_DelayedCallback *)0x01 &&
85
1.69k
       el->delayedHead2 <= (UA_DelayedCallback *)0x01)
86
1.03k
        return; /* The queue is empty */
87
88
658
    UA_Boolean active1 = (el->delayedHead1 != (UA_DelayedCallback*)0x01);
89
658
    UA_DelayedCallback **activeHead = (active1) ? &el->delayedHead1 : &el->delayedHead2;
90
658
    UA_DelayedCallback **inactiveHead = (active1) ? &el->delayedHead2 : &el->delayedHead1;
91
92
    /* Switch active/inactive by resetting the sentinel values. The (old) active
93
     * head points to an element which we return. Parallel threads continue to
94
     * add elements to the queue "below" the first element. */
95
658
    UA_atomic_xchg((void**)inactiveHead, NULL);
96
658
    *oldHead = (UA_DelayedCallback *)
97
658
        UA_atomic_xchg((void**)activeHead, (void*)0x01);
98
99
    /* Make the tail point to the (new) active head. Return the value of last
100
     * tail. When iterating over the queue elements, we need to find this tail
101
     * as the last element. If we find a NULL next-pointer before hitting the
102
     * tail spinlock until the pointer updates (eventually consistent). */
103
658
    *oldTail = (UA_DelayedCallback*)
104
658
        UA_atomic_xchg((void**)&el->delayedTail, inactiveHead);
105
658
}
106
107
static void
108
UA_EventLoopPOSIX_removeDelayedCallback(UA_EventLoop *public_el,
109
113
                                        UA_DelayedCallback *dc) {
110
113
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
111
113
    UA_LOCK(&el->elMutex);
112
113
    /* Reset and get the old head and tail */
114
113
    UA_DelayedCallback *cur = NULL, *tail = NULL;
115
113
    resetDelayedQueue(el, &cur, &tail);
116
117
    /* Loop until we reach the tail (or head and tail are both NULL) */
118
113
    UA_DelayedCallback *next;
119
113
    for(; cur; cur = next) {
120
        /* Spin-loop until the next-pointer of cur is updated.
121
         * The element pointed to by tail must appear eventually. */
122
0
        next = cur->next;
123
0
        while(!next && cur != tail)
124
0
            next = (UA_DelayedCallback *)UA_atomic_load((void**)&cur->next);
125
0
        if(cur == dc)
126
0
            continue;
127
0
        UA_EventLoopPOSIX_addDelayedCallback(public_el, cur);
128
0
    }
129
130
113
    UA_UNLOCK(&el->elMutex);
131
113
}
132
133
static void
134
1.57k
processDelayed(UA_EventLoopPOSIX *el) {
135
1.57k
    UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
136
1.57k
                 "Process delayed callbacks");
137
138
1.57k
    UA_LOCK_ASSERT(&el->elMutex);
139
140
    /* Reset and get the old head and tail */
141
1.57k
    UA_DelayedCallback *dc = NULL, *tail = NULL;
142
1.57k
    resetDelayedQueue(el, &dc, &tail);
143
144
    /* Loop until we reach the tail (or head and tail are both NULL) */
145
1.57k
    UA_DelayedCallback *next;
146
2.77k
    for(; dc; dc = next) {
147
1.19k
        next = dc->next;
148
1.19k
        while(!next && dc != tail)
149
0
            next = (UA_DelayedCallback *)UA_atomic_load((void**)&dc->next);
150
1.19k
        if(!dc->callback)
151
0
            continue;
152
1.19k
        dc->callback(dc->application, dc->context);
153
1.19k
    }
154
1.57k
}
155
156
/***********************/
157
/* EventLoop Lifecycle */
158
/***********************/
159
160
static UA_StatusCode
161
378
UA_EventLoopPOSIX_start(UA_EventLoopPOSIX *el) {
162
378
    UA_LOCK(&el->elMutex);
163
164
378
    if(el->eventLoop.state != UA_EVENTLOOPSTATE_FRESH &&
165
378
       el->eventLoop.state != UA_EVENTLOOPSTATE_STOPPED) {
166
0
        UA_UNLOCK(&el->elMutex);
167
0
        return UA_STATUSCODE_BADINTERNALERROR;
168
0
    }
169
170
378
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
171
378
                 "Starting the EventLoop");
172
173
    /* Setting the clock source */
174
378
    const UA_Int32 *cs = (const UA_Int32*)
175
378
        UA_KeyValueMap_getScalar(&el->eventLoop.params,
176
378
                                 UA_QUALIFIEDNAME(0, "clock-source"),
177
378
                                 &UA_TYPES[UA_TYPES_INT32]);
178
378
    const UA_Int32 *csm = (const UA_Int32*)
179
378
        UA_KeyValueMap_getScalar(&el->eventLoop.params,
180
378
                                 UA_QUALIFIEDNAME(0, "clock-source-monotonic"),
181
378
                                 &UA_TYPES[UA_TYPES_INT32]);
182
378
#if defined(UA_ARCHITECTURE_POSIX) && !defined(__APPLE__) && !defined(__MACH__)
183
378
    el->clockSource = CLOCK_REALTIME;
184
378
    if(cs)
185
0
        el->clockSource = *cs;
186
187
378
# ifdef CLOCK_MONOTONIC_RAW
188
378
    el->clockSourceMonotonic = CLOCK_MONOTONIC_RAW;
189
# else
190
    el->clockSourceMonotonic = CLOCK_MONOTONIC;
191
# endif
192
378
    if(csm)
193
0
        el->clockSourceMonotonic = *csm;
194
#else
195
    if(cs || csm) {
196
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
197
                       "Eventloop\t| Cannot set a custom clock source");
198
    }
199
#endif
200
201
    /* Create the self-pipe */
202
378
    int err = UA_EventLoopPOSIX_pipe(el->selfpipe);
203
378
    if(err != 0) {
204
0
        UA_LOG_SOCKET_ERRNO_WRAP(
205
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
206
0
                          "Eventloop\t| Could not create the self-pipe (%s)",
207
0
                          errno_str));
208
0
        UA_UNLOCK(&el->elMutex);
209
0
        return UA_STATUSCODE_BADINTERNALERROR;
210
0
    }
211
212
    /* Create the epoll socket */
213
378
#ifdef UA_HAVE_EPOLL
214
378
    el->epollfd = epoll_create1(0);
215
378
    if(el->epollfd == -1) {
216
0
        UA_LOG_SOCKET_ERRNO_WRAP(
217
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
218
0
                          "Eventloop\t| Could not create the epoll socket (%s)",
219
0
                          errno_str));
220
0
        UA_close(el->selfpipe[0]);
221
0
        UA_close(el->selfpipe[1]);
222
0
        UA_UNLOCK(&el->elMutex);
223
0
        return UA_STATUSCODE_BADINTERNALERROR;
224
0
    }
225
226
    /* epoll always listens on the self-pipe. This is the only epoll_event that
227
     * has a NULL data pointer. */
228
378
    struct epoll_event event;
229
378
    memset(&event, 0, sizeof(struct epoll_event));
230
378
    event.events = EPOLLIN;
231
378
    err = epoll_ctl(el->epollfd, EPOLL_CTL_ADD, el->selfpipe[0], &event);
232
378
    if(err != 0) {
233
0
        UA_LOG_SOCKET_ERRNO_WRAP(
234
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
235
0
                          "Eventloop\t| Could not register the self-pipe for epoll (%s)",
236
0
                          errno_str));
237
0
        UA_close(el->selfpipe[0]);
238
0
        UA_close(el->selfpipe[1]);
239
0
        close(el->epollfd);
240
0
        UA_UNLOCK(&el->elMutex);
241
0
        return UA_STATUSCODE_BADINTERNALERROR;
242
0
    }
243
378
#endif
244
245
    /* Start the EventSources */
246
378
    UA_StatusCode res = UA_STATUSCODE_GOOD;
247
378
    UA_EventSource *es = el->eventLoop.eventSources;
248
1.89k
    while(es) {
249
1.51k
        res |= es->start(es);
250
1.51k
        es = es->next;
251
1.51k
    }
252
253
    /* Dirty-write the state that is const "from the outside" */
254
378
    *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state =
255
378
        UA_EVENTLOOPSTATE_STARTED;
256
257
378
    UA_UNLOCK(&el->elMutex);
258
378
    return res;
259
378
}
260
261
static void
262
579
checkClosed(UA_EventLoopPOSIX *el) {
263
579
    UA_LOCK_ASSERT(&el->elMutex);
264
265
579
    UA_EventSource *es = el->eventLoop.eventSources;
266
2.89k
    while(es) {
267
2.31k
        if(es->state != UA_EVENTSOURCESTATE_STOPPED)
268
0
            return;
269
2.31k
        es = es->next;
270
2.31k
    }
271
272
    /* Not closed until all delayed callbacks are processed */
273
579
    if(el->delayedHead1 != NULL && el->delayedHead2 != NULL)
274
201
        return;
275
276
    /* Close the self-pipe when everything else is done */
277
378
    UA_close(el->selfpipe[0]);
278
378
    UA_close(el->selfpipe[1]);
279
280
    /* Dirty-write the state that is const "from the outside" */
281
378
    *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state =
282
378
        UA_EVENTLOOPSTATE_STOPPED;
283
284
    /* Close the epoll/IOCP socket once all EventSources have shut down */
285
378
#ifdef UA_HAVE_EPOLL
286
378
    UA_close(el->epollfd);
287
378
#endif
288
289
378
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
290
378
                 "The EventLoop has stopped");
291
378
}
292
293
static void
294
378
UA_EventLoopPOSIX_stop(UA_EventLoopPOSIX *el) {
295
378
    UA_LOCK(&el->elMutex);
296
297
378
    if(el->eventLoop.state != UA_EVENTLOOPSTATE_STARTED) {
298
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
299
0
                       "The EventLoop is not running, cannot be stopped");
300
0
        UA_UNLOCK(&el->elMutex);
301
0
        return;
302
0
    }
303
304
378
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
305
378
                 "Stopping the EventLoop");
306
307
    /* Set to STOPPING to prevent "normal use" */
308
378
    *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state =
309
378
        UA_EVENTLOOPSTATE_STOPPING;
310
311
    /* Stop all event sources (asynchronous) */
312
378
    UA_EventSource *es = el->eventLoop.eventSources;
313
1.89k
    for(; es; es = es->next) {
314
1.51k
        if(es->state == UA_EVENTSOURCESTATE_STARTING ||
315
1.51k
           es->state == UA_EVENTSOURCESTATE_STARTED) {
316
1.51k
            es->stop(es);
317
1.51k
        }
318
1.51k
    }
319
320
    /* Set to STOPPED if all EventSources are STOPPED */
321
378
    checkClosed(el);
322
323
378
    UA_UNLOCK(&el->elMutex);
324
378
}
325
326
static UA_StatusCode
327
1.20k
UA_EventLoopPOSIX_run(UA_EventLoopPOSIX *el, UA_UInt32 timeout) {
328
1.20k
    UA_LOCK(&el->elMutex);
329
330
1.20k
    if(el->executing) {
331
0
        UA_LOG_ERROR(el->eventLoop.logger,
332
0
                     UA_LOGCATEGORY_EVENTLOOP,
333
0
                     "Cannot run EventLoop from the run method itself");
334
0
        UA_UNLOCK(&el->elMutex);
335
0
        return UA_STATUSCODE_BADINTERNALERROR;
336
0
    }
337
338
1.20k
    el->executing = true;
339
340
1.20k
    if(el->eventLoop.state == UA_EVENTLOOPSTATE_FRESH ||
341
1.20k
       el->eventLoop.state == UA_EVENTLOOPSTATE_STOPPED) {
342
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
343
0
                       "Cannot run a stopped EventLoop");
344
0
        el->executing = false;
345
0
        UA_UNLOCK(&el->elMutex);
346
0
        return UA_STATUSCODE_BADINTERNALERROR;
347
0
    }
348
349
1.20k
    UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
350
1.20k
                 "Iterate the EventLoop");
351
352
    /* Process cyclic callbacks */
353
1.20k
    UA_DateTime dateBefore =
354
1.20k
        el->eventLoop.dateTime_nowMonotonic(&el->eventLoop);
355
356
1.20k
    UA_DateTime dateNext = UA_Timer_process(&el->timer, dateBefore);
357
358
    /* Process delayed callbacks here:
359
     * - Removes closed sockets already here instead of polling them again.
360
     * - The timeout for polling is selected to be ready in time for the next
361
     *   cyclic callback. So we want to do little work between the timeout
362
     *   running out and executing the due cyclic callbacks. */
363
1.20k
    processDelayed(el);
364
365
    /* A delayed callback could create another delayed callback (or re-add
366
     * itself). In that case we don't want to wait (indefinitely) for an event
367
     * to happen. Process queued events but don't sleep. Then process the
368
     * delayed callbacks in the next iteration. */
369
1.20k
    if(el->delayedHead1 != NULL && el->delayedHead2 != NULL)
370
0
        timeout = 0;
371
372
    /* Compute the remaining time */
373
1.20k
    UA_DateTime maxDate = dateBefore + (timeout * UA_DATETIME_MSEC);
374
1.20k
    if(dateNext > maxDate)
375
1.20k
        dateNext = maxDate;
376
1.20k
    UA_DateTime listenTimeout =
377
1.20k
        dateNext - el->eventLoop.dateTime_nowMonotonic(&el->eventLoop);
378
1.20k
    if(listenTimeout < 0)
379
563
        listenTimeout = 0;
380
381
    /* Listen on the active file-descriptors (sockets) from the
382
     * ConnectionManagers */
383
1.20k
    UA_StatusCode rv = UA_EventLoopPOSIX_pollFDs(el, listenTimeout);
384
385
    /* Check if the last EventSource was successfully stopped */
386
1.20k
    if(el->eventLoop.state == UA_EVENTLOOPSTATE_STOPPING)
387
201
        checkClosed(el);
388
389
1.20k
    el->executing = false;
390
1.20k
    UA_UNLOCK(&el->elMutex);
391
1.20k
    return rv;
392
1.20k
}
393
394
/*****************************/
395
/* Registering Event Sources */
396
/*****************************/
397
398
static UA_StatusCode
399
UA_EventLoopPOSIX_registerEventSource(UA_EventLoopPOSIX *el,
400
1.51k
                                      UA_EventSource *es) {
401
1.51k
    UA_LOCK(&el->elMutex);
402
403
    /* Already registered? */
404
1.51k
    if(es->state != UA_EVENTSOURCESTATE_FRESH) {
405
0
        UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
406
0
                     "Cannot register the EventSource \"%.*s\": "
407
0
                     "already registered",
408
0
                     (int)es->name.length, (char*)es->name.data);
409
0
        UA_UNLOCK(&el->elMutex);
410
0
        return UA_STATUSCODE_BADINTERNALERROR;
411
0
    }
412
413
    /* Add to linked list */
414
1.51k
    es->next = el->eventLoop.eventSources;
415
1.51k
    el->eventLoop.eventSources = es;
416
417
1.51k
    es->eventLoop = &el->eventLoop;
418
1.51k
    es->state = UA_EVENTSOURCESTATE_STOPPED;
419
420
    /* Start if the entire EventLoop is started */
421
1.51k
    UA_StatusCode res = UA_STATUSCODE_GOOD;
422
1.51k
    if(el->eventLoop.state == UA_EVENTLOOPSTATE_STARTED)
423
0
        res = es->start(es);
424
425
1.51k
    UA_UNLOCK(&el->elMutex);
426
1.51k
    return res;
427
1.51k
}
428
429
static UA_StatusCode
430
UA_EventLoopPOSIX_deregisterEventSource(UA_EventLoopPOSIX *el,
431
1.51k
                                        UA_EventSource *es) {
432
1.51k
    UA_LOCK(&el->elMutex);
433
434
1.51k
    if(es->state != UA_EVENTSOURCESTATE_STOPPED) {
435
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
436
0
                       "Cannot deregister the EventSource %.*s: "
437
0
                       "Has to be stopped first",
438
0
                       (int)es->name.length, es->name.data);
439
0
        UA_UNLOCK(&el->elMutex);
440
0
        return UA_STATUSCODE_BADINTERNALERROR;
441
0
    }
442
443
    /* Remove from the linked list */
444
1.51k
    UA_EventSource **s = &el->eventLoop.eventSources;
445
1.51k
    while(*s) {
446
1.51k
        if(*s == es) {
447
1.51k
            *s = es->next;
448
1.51k
            break;
449
1.51k
        }
450
0
        s = &(*s)->next;
451
0
    }
452
453
    /* Set the state to non-registered */
454
1.51k
    es->state = UA_EVENTSOURCESTATE_FRESH;
455
456
1.51k
    UA_UNLOCK(&el->elMutex);
457
1.51k
    return UA_STATUSCODE_GOOD;
458
1.51k
}
459
460
/***************/
461
/* Time Domain */
462
/***************/
463
464
static UA_DateTime
465
438k
UA_EventLoopPOSIX_DateTime_now(UA_EventLoop *el) {
466
438k
#if defined(UA_ARCHITECTURE_POSIX) && !defined(__APPLE__) && !defined(__MACH__)
467
438k
    UA_EventLoopPOSIX *pel = (UA_EventLoopPOSIX*)el;
468
438k
    struct timespec ts;
469
438k
    int res = clock_gettime(pel->clockSource, &ts);
470
438k
    if(UA_UNLIKELY(res != 0))
471
0
        return 0;
472
438k
    return (ts.tv_sec * UA_DATETIME_SEC) + (ts.tv_nsec / 100) + UA_DATETIME_UNIX_EPOCH;
473
#else
474
    return UA_DateTime_now();
475
#endif
476
438k
}
477
478
static UA_DateTime
479
4.71k
UA_EventLoopPOSIX_DateTime_nowMonotonic(UA_EventLoop *el) {
480
4.71k
#if defined(UA_ARCHITECTURE_POSIX) && !defined(__APPLE__) && !defined(__MACH__)
481
4.71k
    UA_EventLoopPOSIX *pel = (UA_EventLoopPOSIX*)el;
482
4.71k
    struct timespec ts;
483
4.71k
    int res = clock_gettime(pel->clockSourceMonotonic, &ts);
484
4.71k
    if(UA_UNLIKELY(res != 0))
485
0
        return 0;
486
    /* Also add the unix epoch for the monotonic clock. So we get a "normal"
487
     * output when a "normal" source is configured. */
488
4.71k
    return (ts.tv_sec * UA_DATETIME_SEC) + (ts.tv_nsec / 100) + UA_DATETIME_UNIX_EPOCH;
489
#else
490
    return UA_DateTime_nowMonotonic();
491
#endif
492
4.71k
}
493
494
static UA_Int64
495
0
UA_EventLoopPOSIX_DateTime_localTimeUtcOffset(UA_EventLoop *el) {
496
    /* TODO: Fix for custom clock sources */
497
0
    return UA_DateTime_localTimeUtcOffset();
498
0
}
499
500
/*************************/
501
/* Initialize and Delete */
502
/*************************/
503
504
static UA_StatusCode
505
378
UA_EventLoopPOSIX_free(UA_EventLoopPOSIX *el) {
506
378
    UA_LOCK(&el->elMutex);
507
508
    /* Check if the EventLoop can be deleted */
509
378
    if(el->eventLoop.state != UA_EVENTLOOPSTATE_STOPPED &&
510
378
       el->eventLoop.state != UA_EVENTLOOPSTATE_FRESH) {
511
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
512
0
                       "Cannot delete a running EventLoop");
513
0
        UA_UNLOCK(&el->elMutex);
514
0
        return UA_STATUSCODE_BADINTERNALERROR;
515
0
    }
516
517
    /* Deregister and delete all the EventSources */
518
1.89k
    while(el->eventLoop.eventSources) {
519
1.51k
        UA_EventSource *es = el->eventLoop.eventSources;
520
1.51k
        UA_EventLoopPOSIX_deregisterEventSource(el, es);
521
1.51k
        es->free(es);
522
1.51k
    }
523
524
    /* Remove the repeated timed callbacks */
525
378
    UA_Timer_clear(&el->timer);
526
527
    /* Process remaining delayed callbacks */
528
378
    processDelayed(el);
529
530
#ifdef UA_ARCHITECTURE_WIN32
531
    /* Stop the Windows networking subsystem */
532
    WSACleanup();
533
#endif
534
535
378
    UA_KeyValueMap_clear(&el->eventLoop.params);
536
537
    /* Clean up */
538
378
    UA_UNLOCK(&el->elMutex);
539
378
    UA_LOCK_DESTROY(&el->elMutex);
540
378
    UA_free(el);
541
378
    return UA_STATUSCODE_GOOD;
542
378
}
543
544
static void
545
847k
UA_EventLoopPOSIX_lock(UA_EventLoop *public_el) {
546
847k
    UA_LOCK(&((UA_EventLoopPOSIX*)public_el)->elMutex);
547
847k
}
548
static void
549
847k
UA_EventLoopPOSIX_unlock(UA_EventLoop *public_el) {
550
847k
    UA_UNLOCK(&((UA_EventLoopPOSIX*)public_el)->elMutex);
551
847k
}
552
553
UA_EventLoop *
554
378
UA_EventLoop_new_POSIX(const UA_Logger *logger) {
555
378
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)
556
378
        UA_calloc(1, sizeof(UA_EventLoopPOSIX));
557
378
    if(!el)
558
0
        return NULL;
559
560
378
    UA_LOCK_INIT(&el->elMutex);
561
378
    UA_Timer_init(&el->timer);
562
563
    /* Initialize the queue */
564
378
    el->delayedTail = &el->delayedHead1;
565
378
    el->delayedHead2 = (UA_DelayedCallback*)0x01; /* sentinel value */
566
567
#ifdef UA_ARCHITECTURE_WIN32
568
    /* Start the WSA networking subsystem on Windows */
569
    WSADATA wsaData;
570
    WSAStartup(MAKEWORD(2, 2), &wsaData);
571
#endif
572
573
    /* Set the public EventLoop content */
574
378
    el->eventLoop.logger = logger;
575
576
378
    el->eventLoop.start = (UA_StatusCode (*)(UA_EventLoop*))UA_EventLoopPOSIX_start;
577
378
    el->eventLoop.stop = (void (*)(UA_EventLoop*))UA_EventLoopPOSIX_stop;
578
378
    el->eventLoop.free = (UA_StatusCode (*)(UA_EventLoop*))UA_EventLoopPOSIX_free;
579
378
    el->eventLoop.run = (UA_StatusCode (*)(UA_EventLoop*, UA_UInt32))UA_EventLoopPOSIX_run;
580
378
    el->eventLoop.cancel = (void (*)(UA_EventLoop*))UA_EventLoopPOSIX_cancel;
581
582
378
    el->eventLoop.dateTime_now = UA_EventLoopPOSIX_DateTime_now;
583
378
    el->eventLoop.dateTime_nowMonotonic =
584
378
        UA_EventLoopPOSIX_DateTime_nowMonotonic;
585
378
    el->eventLoop.dateTime_localTimeUtcOffset =
586
378
        UA_EventLoopPOSIX_DateTime_localTimeUtcOffset;
587
588
378
    el->eventLoop.nextTimer = UA_EventLoopPOSIX_nextTimer;
589
378
    el->eventLoop.addTimer = UA_EventLoopPOSIX_addTimer;
590
378
    el->eventLoop.modifyTimer = UA_EventLoopPOSIX_modifyTimer;
591
378
    el->eventLoop.removeTimer = UA_EventLoopPOSIX_removeTimer;
592
378
    el->eventLoop.addDelayedCallback = UA_EventLoopPOSIX_addDelayedCallback;
593
378
    el->eventLoop.removeDelayedCallback = UA_EventLoopPOSIX_removeDelayedCallback;
594
595
378
    el->eventLoop.registerEventSource =
596
378
        (UA_StatusCode (*)(UA_EventLoop*, UA_EventSource*))
597
378
        UA_EventLoopPOSIX_registerEventSource;
598
378
    el->eventLoop.deregisterEventSource =
599
378
        (UA_StatusCode (*)(UA_EventLoop*, UA_EventSource*))
600
378
        UA_EventLoopPOSIX_deregisterEventSource;
601
602
378
    el->eventLoop.lock = UA_EventLoopPOSIX_lock;
603
378
    el->eventLoop.unlock = UA_EventLoopPOSIX_unlock;
604
605
378
    return &el->eventLoop;
606
378
}
607
608
/***************************/
609
/* Network Buffer Handling */
610
/***************************/
611
612
UA_StatusCode
613
UA_EventLoopPOSIX_allocNetworkBuffer(UA_ConnectionManager *cm,
614
                                     uintptr_t connectionId,
615
                                     UA_ByteString *buf,
616
104
                                     size_t bufSize) {
617
104
    UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm;
618
104
    if(pcm->txBuffer.length == 0)
619
104
        return UA_ByteString_allocBuffer(buf, bufSize);
620
0
    if(pcm->txBuffer.length < bufSize)
621
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
622
0
    *buf = pcm->txBuffer;
623
0
    buf->length = bufSize;
624
0
    return UA_STATUSCODE_GOOD;
625
0
}
626
627
void
628
UA_EventLoopPOSIX_freeNetworkBuffer(UA_ConnectionManager *cm,
629
                                    uintptr_t connectionId,
630
104
                                    UA_ByteString *buf) {
631
104
    UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm;
632
104
    if(pcm->txBuffer.data == buf->data)
633
0
        UA_ByteString_init(buf);
634
104
    else
635
104
        UA_ByteString_clear(buf);
636
104
}
637
638
UA_StatusCode
639
1.13k
UA_EventLoopPOSIX_allocateStaticBuffers(UA_POSIXConnectionManager *pcm) {
640
1.13k
    UA_StatusCode res = UA_STATUSCODE_GOOD;
641
1.13k
    UA_UInt32 rxBufSize = 2u << 16; /* The default is 64kb */
642
1.13k
    const UA_UInt32 *configRxBufSize = (const UA_UInt32 *)
643
1.13k
        UA_KeyValueMap_getScalar(&pcm->cm.eventSource.params,
644
1.13k
                                 UA_QUALIFIEDNAME(0, "recv-bufsize"),
645
1.13k
                                 &UA_TYPES[UA_TYPES_UINT32]);
646
1.13k
    if(configRxBufSize)
647
0
        rxBufSize = *configRxBufSize;
648
1.13k
    if(pcm->rxBuffer.length != rxBufSize) {
649
1.13k
        UA_ByteString_clear(&pcm->rxBuffer);
650
1.13k
        res = UA_ByteString_allocBuffer(&pcm->rxBuffer, rxBufSize);
651
1.13k
    }
652
653
1.13k
    const UA_UInt32 *txBufSize = (const UA_UInt32 *)
654
1.13k
        UA_KeyValueMap_getScalar(&pcm->cm.eventSource.params,
655
1.13k
                                 UA_QUALIFIEDNAME(0, "send-bufsize"),
656
1.13k
                                 &UA_TYPES[UA_TYPES_UINT32]);
657
1.13k
    if(txBufSize && pcm->txBuffer.length != *txBufSize) {
658
0
        UA_ByteString_clear(&pcm->txBuffer);
659
0
        res |= UA_ByteString_allocBuffer(&pcm->txBuffer, *txBufSize);
660
0
    }
661
1.13k
    return res;
662
1.13k
}
663
664
/******************/
665
/* Socket Options */
666
/******************/
667
668
enum ZIP_CMP
669
2.00k
cmpFD(const UA_FD *a, const UA_FD *b) {
670
2.00k
    if(*a == *b)
671
821
        return ZIP_CMP_EQ;
672
1.17k
    return (*a < *b) ? ZIP_CMP_LESS : ZIP_CMP_MORE;
673
2.00k
}
674
675
UA_StatusCode
676
708
UA_EventLoopPOSIX_setNonBlocking(UA_FD sockfd) {
677
708
#ifndef UA_ARCHITECTURE_WIN32
678
708
    int opts = fcntl(sockfd, F_GETFL);
679
708
    if(opts < 0 || fcntl(sockfd, F_SETFL, opts | O_NONBLOCK) < 0)
680
0
        return UA_STATUSCODE_BADINTERNALERROR;
681
#else
682
    u_long iMode = 1;
683
    if(ioctlsocket(sockfd, FIONBIO, &iMode) != NO_ERROR)
684
        return UA_STATUSCODE_BADINTERNALERROR;
685
#endif
686
708
    return UA_STATUSCODE_GOOD;
687
708
}
688
689
UA_StatusCode
690
826
UA_EventLoopPOSIX_setNoSigPipe(UA_FD sockfd) {
691
#ifdef SO_NOSIGPIPE
692
    int val = 1;
693
    int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val));
694
    if(res < 0)
695
        return UA_STATUSCODE_BADINTERNALERROR;
696
#endif
697
826
    return UA_STATUSCODE_GOOD;
698
826
}
699
700
UA_StatusCode
701
821
UA_EventLoopPOSIX_setReusable(UA_FD sockfd) {
702
821
    int enableReuseVal = 1;
703
821
#ifndef UA_ARCHITECTURE_WIN32
704
821
    int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
705
821
                            (const char*)&enableReuseVal, sizeof(enableReuseVal));
706
821
    res |= UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT,
707
821
                            (const char*)&enableReuseVal, sizeof(enableReuseVal));
708
821
    return (res == 0) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADINTERNALERROR;
709
#else
710
    int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
711
                            (const char*)&enableReuseVal, sizeof(enableReuseVal));
712
    return (res == 0) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADINTERNALERROR;
713
#endif
714
821
}
715
716
/************************/
717
/* Select / epoll Logic */
718
/************************/
719
720
/* Re-arm the self-pipe socket for the next signal by reading from it */
721
static void
722
0
flushSelfPipe(UA_SOCKET s) {
723
0
    char buf[128];
724
#ifdef UA_ARCHITECTURE_WIN32
725
    recv(s, buf, 128, 0);
726
#else
727
0
    ssize_t i;
728
0
    do {
729
0
        i = read(s, buf, 128);
730
0
    } while(i > 0);
731
0
#endif
732
0
}
733
734
#if !defined(UA_HAVE_EPOLL)
735
736
UA_StatusCode
737
UA_EventLoopPOSIX_registerFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
738
    UA_LOCK_ASSERT(&el->elMutex);
739
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
740
                 "Registering fd: %u", (unsigned)rfd->fd);
741
742
    /* Realloc */
743
    UA_RegisteredFD **fds_tmp = (UA_RegisteredFD**)
744
        UA_realloc(el->fds, sizeof(UA_RegisteredFD*) * (el->fdsSize + 1));
745
    if(!fds_tmp) {
746
        return UA_STATUSCODE_BADOUTOFMEMORY;
747
    }
748
    el->fds = fds_tmp;
749
750
    /* Add to the last entry */
751
    el->fds[el->fdsSize] = rfd;
752
    el->fdsSize++;
753
    return UA_STATUSCODE_GOOD;
754
}
755
756
UA_StatusCode
757
UA_EventLoopPOSIX_modifyFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
758
    /* Do nothing, it is enough if the data was changed in the rfd */
759
    UA_LOCK_ASSERT(&el->elMutex);
760
    return UA_STATUSCODE_GOOD;
761
}
762
763
void
764
UA_EventLoopPOSIX_deregisterFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
765
    UA_LOCK_ASSERT(&el->elMutex);
766
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
767
                 "Unregistering fd: %u", (unsigned)rfd->fd);
768
769
    /* Find the entry */
770
    size_t i = 0;
771
    for(; i < el->fdsSize; i++) {
772
        if(el->fds[i] == rfd)
773
            break;
774
    }
775
776
    /* Not found? */
777
    if(i == el->fdsSize)
778
        return;
779
780
    if(el->fdsSize > 1) {
781
        /* Move the last entry in the ith slot and realloc. */
782
        el->fdsSize--;
783
        el->fds[i] = el->fds[el->fdsSize];
784
        UA_RegisteredFD **fds_tmp = (UA_RegisteredFD**)
785
            UA_realloc(el->fds, sizeof(UA_RegisteredFD*) * el->fdsSize);
786
        /* if realloc fails the fds are still in a correct state with
787
         * possibly lost memory, so failing silently here is ok */
788
        if(fds_tmp)
789
            el->fds = fds_tmp;
790
    } else {
791
        /* Remove the last entry */
792
        UA_free(el->fds);
793
        el->fds = NULL;
794
        el->fdsSize = 0;
795
    }
796
}
797
798
static UA_FD
799
setFDSets(UA_EventLoopPOSIX *el, fd_set *readset, fd_set *writeset, fd_set *errset) {
800
    UA_LOCK_ASSERT(&el->elMutex);
801
802
    FD_ZERO(readset);
803
    FD_ZERO(writeset);
804
    FD_ZERO(errset);
805
806
    /* Always listen on the read-end of the pipe */
807
    UA_FD highestfd = el->selfpipe[0];
808
    FD_SET(el->selfpipe[0], readset);
809
810
    for(size_t i = 0; i < el->fdsSize; i++) {
811
        UA_FD currentFD = el->fds[i]->fd;
812
813
        /* Add to the fd_sets */
814
        if(el->fds[i]->listenEvents & UA_FDEVENT_IN)
815
            FD_SET(currentFD, readset);
816
        if(el->fds[i]->listenEvents & UA_FDEVENT_OUT)
817
            FD_SET(currentFD, writeset);
818
819
        /* Always return errors */
820
        FD_SET(currentFD, errset);
821
822
        /* Highest fd? */
823
        if(currentFD > highestfd)
824
            highestfd = currentFD;
825
    }
826
    return highestfd;
827
}
828
829
UA_StatusCode
830
UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
831
    UA_assert(listenTimeout >= 0);
832
    UA_LOCK_ASSERT(&el->elMutex);
833
834
    fd_set readset, writeset, errset;
835
    UA_FD highestfd = setFDSets(el, &readset, &writeset, &errset);
836
837
    /* Nothing to do? */
838
    if(highestfd == UA_INVALID_FD) {
839
        UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
840
                     "No valid FDs for processing");
841
        return UA_STATUSCODE_GOOD;
842
    }
843
844
    struct timeval tmptv = {
845
#ifndef UA_ARCHITECTURE_WIN32
846
        (time_t)(listenTimeout / UA_DATETIME_SEC),
847
        (suseconds_t)((listenTimeout % UA_DATETIME_SEC) / UA_DATETIME_USEC)
848
#else
849
        (long)(listenTimeout / UA_DATETIME_SEC),
850
        (long)((listenTimeout % UA_DATETIME_SEC) / UA_DATETIME_USEC)
851
#endif
852
    };
853
854
    UA_UNLOCK(&el->elMutex);
855
    int selectStatus = UA_select(highestfd+1, &readset, &writeset, &errset, &tmptv);
856
    UA_LOCK(&el->elMutex);
857
    if(selectStatus < 0) {
858
        /* We will retry, only log the error */
859
        UA_LOG_SOCKET_ERRNO_WRAP(
860
            UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
861
                           "Error during select: %s", errno_str));
862
        return UA_STATUSCODE_GOOD;
863
    }
864
865
    /* The self-pipe has received. Clear the buffer by reading. */
866
    if(UA_UNLIKELY(FD_ISSET(el->selfpipe[0], &readset)))
867
        flushSelfPipe(el->selfpipe[0]);
868
869
    /* Loop over all registered FD to see if an event arrived. Yes, this is why
870
     * select is slow for many open sockets. */
871
    for(size_t i = 0; i < el->fdsSize; i++) {
872
        UA_RegisteredFD *rfd = el->fds[i];
873
874
        /* The rfd is already registered for removal. Don't process incoming
875
         * events any longer. */
876
        if(rfd->dc.callback)
877
            continue;
878
879
        /* Event signaled for the fd? */
880
        short event = 0;
881
        if(FD_ISSET(rfd->fd, &readset)) {
882
            event = UA_FDEVENT_IN;
883
        } else if(FD_ISSET(rfd->fd, &writeset)) {
884
            event = UA_FDEVENT_OUT;
885
        } else if(FD_ISSET(rfd->fd, &errset)) {
886
            event = UA_FDEVENT_ERR;
887
        } else {
888
            continue;
889
        }
890
891
        UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
892
                     "Processing event %u on fd %u", (unsigned)event,
893
                     (unsigned)rfd->fd);
894
895
        /* Call the EventSource callback */
896
        rfd->eventSourceCB(rfd->es, rfd, event);
897
898
        /* The fd has removed itself */
899
        if(i == el->fdsSize || rfd != el->fds[i])
900
            i--;
901
    }
902
    return UA_STATUSCODE_GOOD;
903
}
904
905
#else /* defined(UA_HAVE_EPOLL) */
906
907
UA_StatusCode
908
821
UA_EventLoopPOSIX_registerFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
909
821
    struct epoll_event event;
910
821
    memset(&event, 0, sizeof(struct epoll_event));
911
821
    event.data.ptr = rfd;
912
821
    event.events = 0;
913
821
    if(rfd->listenEvents & UA_FDEVENT_IN)
914
644
        event.events |= EPOLLIN;
915
821
    if(rfd->listenEvents & UA_FDEVENT_OUT)
916
0
        event.events |= EPOLLOUT;
917
918
821
    int err = epoll_ctl(el->epollfd, EPOLL_CTL_ADD, rfd->fd, &event);
919
821
    if(err != 0) {
920
0
        UA_LOG_SOCKET_ERRNO_WRAP(
921
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
922
0
                          "TCP %u\t| Could not register for epoll (%s)",
923
0
                          rfd->fd, errno_str));
924
0
        return UA_STATUSCODE_BADINTERNALERROR;
925
0
    }
926
821
    return UA_STATUSCODE_GOOD;
927
821
}
928
929
UA_StatusCode
930
0
UA_EventLoopPOSIX_modifyFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
931
0
    struct epoll_event event;
932
0
    memset(&event, 0, sizeof(struct epoll_event));
933
0
    event.data.ptr = rfd;
934
0
    event.events = 0;
935
0
    if(rfd->listenEvents & UA_FDEVENT_IN)
936
0
        event.events |= EPOLLIN;
937
0
    if(rfd->listenEvents & UA_FDEVENT_OUT)
938
0
        event.events |= EPOLLOUT;
939
940
0
    int err = epoll_ctl(el->epollfd, EPOLL_CTL_MOD, rfd->fd, &event);
941
0
    if(err != 0) {
942
0
        UA_LOG_SOCKET_ERRNO_WRAP(
943
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
944
0
                          "TCP %u\t| Could not modify for epoll (%s)",
945
0
                          rfd->fd, errno_str));
946
0
        return UA_STATUSCODE_BADINTERNALERROR;
947
0
    }
948
0
    return UA_STATUSCODE_GOOD;
949
0
}
950
951
void
952
821
UA_EventLoopPOSIX_deregisterFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
953
821
    int res = epoll_ctl(el->epollfd, EPOLL_CTL_DEL, rfd->fd, NULL);
954
821
    if(res != 0) {
955
0
        UA_LOG_SOCKET_ERRNO_WRAP(
956
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
957
0
                          "TCP %u\t| Could not deregister from epoll (%s)",
958
0
                          rfd->fd, errno_str));
959
0
    }
960
821
}
961
962
UA_StatusCode
963
1.20k
UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
964
1.20k
    UA_assert(listenTimeout >= 0);
965
966
    /* If there is a positive timeout, wait at least one millisecond, the
967
     * minimum for blocking epoll_wait. This prevents a busy-loop, as the
968
     * open62541 library allows even smaller timeouts, which can result in a
969
     * zero timeout due to rounding to an integer here. */
970
1.20k
    int timeout = (int)(listenTimeout / UA_DATETIME_MSEC);
971
1.20k
    if(timeout == 0 && listenTimeout > 0)
972
0
        timeout = 1;
973
974
    /* Poll the registered sockets */
975
1.20k
    struct epoll_event epoll_events[64];
976
1.20k
    UA_UNLOCK(&el->elMutex);
977
1.20k
    int events = epoll_wait(el->epollfd, epoll_events, 64, timeout);
978
1.20k
    UA_LOCK(&el->elMutex);
979
980
    /* TODO: Replace with pwait2 for higher-precision timeouts once this is
981
     * available in the standard library.
982
     *
983
     * struct timespec precisionTimeout = {
984
     *  (long)(listenTimeout / UA_DATETIME_SEC),
985
     *   (long)((listenTimeout % UA_DATETIME_SEC) * 100)
986
     * };
987
     * int events = epoll_pwait2(epollfd, epoll_events, 64,
988
     *                        precisionTimeout, NULL); */
989
990
    /* Handle error conditions */
991
1.20k
    if(events == -1) {
992
2
        if(errno == EINTR) {
993
            /* We will retry, only log the error */
994
2
            UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
995
2
                         "Timeout during poll");
996
2
            return UA_STATUSCODE_GOOD;
997
2
        }
998
0
        UA_LOG_SOCKET_ERRNO_WRAP(
999
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
1000
0
                          "TCP\t| Error %s, closing the server socket",
1001
0
                          errno_str));
1002
0
        return UA_STATUSCODE_BADINTERNALERROR;
1003
2
    }
1004
1005
    /* Process all received events */
1006
1.42k
    for(int i = 0; i < events; i++) {
1007
224
        UA_RegisteredFD *rfd = (UA_RegisteredFD*)epoll_events[i].data.ptr;
1008
1009
        /* The self-pipe has received */
1010
224
        if(!rfd) {
1011
0
            flushSelfPipe(el->selfpipe[0]);
1012
0
            continue;
1013
0
        }
1014
1015
        /* The rfd is already registered for removal. Don't process incoming
1016
         * events any longer. */
1017
224
        if(rfd->dc.callback)
1018
0
            continue;
1019
1020
        /* Get the event */
1021
224
        short revent = 0;
1022
224
        if((epoll_events[i].events & EPOLLIN) == EPOLLIN) {
1023
224
            revent = UA_FDEVENT_IN;
1024
224
        } else if((epoll_events[i].events & EPOLLOUT) == EPOLLOUT) {
1025
0
            revent = UA_FDEVENT_OUT;
1026
0
        } else {
1027
0
            revent = UA_FDEVENT_ERR;
1028
0
        }
1029
1030
        /* Call the EventSource callback */
1031
224
        rfd->eventSourceCB(rfd->es, rfd, revent);
1032
224
    }
1033
1.19k
    return UA_STATUSCODE_GOOD;
1034
1.20k
}
1035
1036
#endif /* defined(UA_HAVE_EPOLL) */
1037
1038
#if defined(UA_ARCHITECTURE_WIN32) || defined(__APPLE__)
1039
int UA_EventLoopPOSIX_pipe(SOCKET fds[2]) {
1040
    struct sockaddr_in inaddr;
1041
    memset(&inaddr, 0, sizeof(inaddr));
1042
    inaddr.sin_family = AF_INET;
1043
    inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1044
    inaddr.sin_port = 0;
1045
1046
    SOCKET lst = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1047
    bind(lst, (struct sockaddr *)&inaddr, sizeof(inaddr));
1048
    listen(lst, 1);
1049
1050
    struct sockaddr_storage addr;
1051
    memset(&addr, 0, sizeof(addr));
1052
    int len = sizeof(addr);
1053
    getsockname(lst, (struct sockaddr*)&addr, &len);
1054
1055
    fds[0] = socket(AF_INET, SOCK_STREAM, 0);
1056
    int err = connect(fds[0], (struct sockaddr*)&addr, len);
1057
    fds[1] = accept(lst, 0, 0);
1058
#ifdef UA_ARCHITECTURE_WIN32
1059
    closesocket(lst);
1060
#endif
1061
#ifdef __APPLE__
1062
    close(lst);
1063
#endif
1064
1065
    UA_EventLoopPOSIX_setNoSigPipe(fds[0]);
1066
    UA_EventLoopPOSIX_setReusable(fds[0]);
1067
    UA_EventLoopPOSIX_setNonBlocking(fds[0]);
1068
    UA_EventLoopPOSIX_setNoSigPipe(fds[1]);
1069
    UA_EventLoopPOSIX_setReusable(fds[1]);
1070
    UA_EventLoopPOSIX_setNonBlocking(fds[1]);
1071
    return err;
1072
}
1073
#elif defined(__QNX__)
1074
int UA_EventLoopPOSIX_pipe(int fds[2]) {
1075
    int err = pipe(fds); 
1076
    if(err == -1) {
1077
      return err;
1078
    }
1079
1080
    err = fcntl(fds[0], F_SETFL, O_NONBLOCK);
1081
    if(err == -1) {
1082
      return err;
1083
    }
1084
    return err;
1085
}
1086
#endif
1087
1088
void
1089
0
UA_EventLoopPOSIX_cancel(UA_EventLoopPOSIX *el) {
1090
    /* Nothing to do if the EventLoop is not executing */
1091
0
    if(!el->executing)
1092
0
        return;
1093
1094
    /* Trigger the self-pipe */
1095
#ifdef UA_ARCHITECTURE_WIN32
1096
    int err = send(el->selfpipe[1], ".", 1, 0);
1097
#else
1098
0
    ssize_t err = write(el->selfpipe[1], ".", 1);
1099
0
#endif
1100
0
    if(err <= 0) {
1101
0
        UA_LOG_SOCKET_ERRNO_WRAP(
1102
0
            UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
1103
0
                           "Eventloop\t| Error signaling self-pipe (%s)", errno_str));
1104
0
    }
1105
0
}
1106
1107
#endif