Coverage Report

Created: 2025-11-11 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/open62541/arch/posix/eventloop_posix.c
Line
Count
Source
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
 *
5
 *    Copyright 2021 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
6
 *    Copyright 2021 (c) Fraunhofer IOSB (Author: Jan Hermes)
7
 */
8
9
#include "eventloop_posix.h"
10
#include "open62541/plugin/eventloop.h"
11
12
#if defined(UA_ARCHITECTURE_POSIX) && !defined(UA_ARCHITECTURE_LWIP) || defined(UA_ARCHITECTURE_WIN32)
13
14
/*********/
15
/* Timer */
16
/*********/
17
18
static UA_DateTime
19
0
UA_EventLoopPOSIX_nextTimer(UA_EventLoop *public_el) {
20
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
21
0
    if(el->delayedHead1 > (UA_DelayedCallback *)0x01 ||
22
0
       el->delayedHead2 > (UA_DelayedCallback *)0x01)
23
0
        return el->eventLoop.dateTime_nowMonotonic(&el->eventLoop);
24
0
    return UA_Timer_next(&el->timer);
25
0
}
26
27
static UA_StatusCode
28
UA_EventLoopPOSIX_addTimer(UA_EventLoop *public_el, UA_Callback cb,
29
                           void *application, void *data, UA_Double interval_ms,
30
                           UA_DateTime *baseTime, UA_TimerPolicy timerPolicy,
31
0
                           UA_UInt64 *callbackId) {
32
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
33
0
    return UA_Timer_add(&el->timer, cb, application, data, interval_ms,
34
0
                        public_el->dateTime_nowMonotonic(public_el),
35
0
                        baseTime, timerPolicy, callbackId);
36
0
}
37
38
static UA_StatusCode
39
UA_EventLoopPOSIX_modifyTimer(UA_EventLoop *public_el,
40
                              UA_UInt64 callbackId,
41
                              UA_Double interval_ms,
42
                              UA_DateTime *baseTime,
43
0
                              UA_TimerPolicy timerPolicy) {
44
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
45
0
    return UA_Timer_modify(&el->timer, callbackId, interval_ms,
46
0
                           public_el->dateTime_nowMonotonic(public_el),
47
0
                           baseTime, timerPolicy);
48
0
}
49
50
static void
51
UA_EventLoopPOSIX_removeTimer(UA_EventLoop *public_el,
52
0
                              UA_UInt64 callbackId) {
53
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
54
0
    UA_Timer_remove(&el->timer, callbackId);
55
0
}
56
57
void
58
UA_EventLoopPOSIX_addDelayedCallback(UA_EventLoop *public_el,
59
0
                                     UA_DelayedCallback *dc) {
60
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
61
0
    dc->next = NULL;
62
63
    /* el->delayedTail points either to prev->next or to the head.
64
     * We need to update two locations:
65
     * 1: el->delayedTail = &dc->next;
66
     * 2: *oldtail = dc; (equal to &dc->next)
67
     *
68
     * Once we have (1), we "own" the previous-to-last entry. No need to worry
69
     * about (2), we can adjust it with a delay. This makes the queue
70
     * "eventually consistent". */
71
0
    UA_DelayedCallback **oldtail = (UA_DelayedCallback**)
72
0
        UA_atomic_xchg((void**)&el->delayedTail, &dc->next);
73
0
    UA_atomic_xchg((void**)oldtail, &dc->next);
74
0
}
75
76
/* Resets the delayed queue and returns the previous head and tail */
77
static void
78
resetDelayedQueue(UA_EventLoopPOSIX *el, UA_DelayedCallback **oldHead,
79
0
                  UA_DelayedCallback **oldTail) {
80
0
    if(el->delayedHead1 <= (UA_DelayedCallback *)0x01 &&
81
0
       el->delayedHead2 <= (UA_DelayedCallback *)0x01)
82
0
        return; /* The queue is empty */
83
84
0
    UA_Boolean active1 = (el->delayedHead1 != (UA_DelayedCallback*)0x01);
85
0
    UA_DelayedCallback **activeHead = (active1) ? &el->delayedHead1 : &el->delayedHead2;
86
0
    UA_DelayedCallback **inactiveHead = (active1) ? &el->delayedHead2 : &el->delayedHead1;
87
88
    /* Switch active/inactive by resetting the sentinel values. The (old) active
89
     * head points to an element which we return. Parallel threads continue to
90
     * add elements to the queue "below" the first element. */
91
0
    UA_atomic_xchg((void**)inactiveHead, NULL);
92
0
    *oldHead = (UA_DelayedCallback *)
93
0
        UA_atomic_xchg((void**)activeHead, (void*)0x01);
94
95
    /* Make the tail point to the (new) active head. Return the value of last
96
     * tail. When iterating over the queue elements, we need to find this tail
97
     * as the last element. If we find a NULL next-pointer before hitting the
98
     * tail spinlock until the pointer updates (eventually consistent). */
99
0
    *oldTail = (UA_DelayedCallback*)
100
0
        UA_atomic_xchg((void**)&el->delayedTail, inactiveHead);
101
0
}
102
103
static void
104
UA_EventLoopPOSIX_removeDelayedCallback(UA_EventLoop *public_el,
105
0
                                        UA_DelayedCallback *dc) {
106
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
107
0
    UA_LOCK(&el->elMutex);
108
109
    /* Reset and get the old head and tail */
110
0
    UA_DelayedCallback *cur = NULL, *tail = NULL;
111
0
    resetDelayedQueue(el, &cur, &tail);
112
113
    /* Loop until we reach the tail (or head and tail are both NULL) */
114
0
    UA_DelayedCallback *next;
115
0
    for(; cur; cur = next) {
116
        /* Spin-loop until the next-pointer of cur is updated.
117
         * The element pointed to by tail must appear eventually. */
118
0
        next = cur->next;
119
0
        while(!next && cur != tail)
120
0
            next = (UA_DelayedCallback *)UA_atomic_load((void**)&cur->next);
121
0
        if(cur == dc)
122
0
            continue;
123
0
        UA_EventLoopPOSIX_addDelayedCallback(public_el, cur);
124
0
    }
125
126
0
    UA_UNLOCK(&el->elMutex);
127
0
}
128
129
static void
130
0
processDelayed(UA_EventLoopPOSIX *el) {
131
0
    UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
132
0
                 "Process delayed callbacks");
133
134
0
    UA_LOCK_ASSERT(&el->elMutex);
135
136
    /* Reset and get the old head and tail */
137
0
    UA_DelayedCallback *dc = NULL, *tail = NULL;
138
0
    resetDelayedQueue(el, &dc, &tail);
139
140
    /* Loop until we reach the tail (or head and tail are both NULL) */
141
0
    UA_DelayedCallback *next;
142
0
    for(; dc; dc = next) {
143
0
        next = dc->next;
144
0
        while(!next && dc != tail)
145
0
            next = (UA_DelayedCallback *)UA_atomic_load((void**)&dc->next);
146
0
        if(!dc->callback)
147
0
            continue;
148
0
        dc->callback(dc->application, dc->context);
149
0
    }
150
0
}
151
152
/***********************/
153
/* EventLoop Lifecycle */
154
/***********************/
155
156
static UA_StatusCode
157
0
UA_EventLoopPOSIX_start(UA_EventLoopPOSIX *el) {
158
0
    UA_LOCK(&el->elMutex);
159
160
0
    if(el->eventLoop.state != UA_EVENTLOOPSTATE_FRESH &&
161
0
       el->eventLoop.state != UA_EVENTLOOPSTATE_STOPPED) {
162
0
        UA_UNLOCK(&el->elMutex);
163
0
        return UA_STATUSCODE_BADINTERNALERROR;
164
0
    }
165
166
0
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
167
0
                 "Starting the EventLoop");
168
169
    /* Setting custom clock source */
170
0
    const UA_Int32 *cs = (const UA_Int32*)
171
0
        UA_KeyValueMap_getScalar(&el->eventLoop.params,
172
0
                                 UA_QUALIFIEDNAME(0, "clock-source"),
173
0
                                 &UA_TYPES[UA_TYPES_INT32]);
174
0
    if(cs)
175
0
        el->clockSource = *cs;
176
177
0
    const UA_Int32 *csm = (const UA_Int32*)
178
0
        UA_KeyValueMap_getScalar(&el->eventLoop.params,
179
0
                                 UA_QUALIFIEDNAME(0, "clock-source-monotonic"),
180
0
                                 &UA_TYPES[UA_TYPES_INT32]);
181
0
    if(csm) {
182
0
        if(el->clockSourceMonotonic != *csm && el->timer.idTree.root) {
183
0
            UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
184
0
                           "Eventloop\t| Setting a different monotonic clock, ",
185
0
                           "but existing timers have been registered with a "
186
0
                           "different clock source");
187
0
        }
188
0
        el->clockSourceMonotonic = *csm;
189
0
    }
190
191
#if !defined(UA_ARCHITECTURE_POSIX)
192
    if(cs || csm) {
193
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
194
                       "Eventloop\t| Setting a different clock source ",
195
                       "not supported for this architecture");
196
    }
197
#endif
198
199
    /* Create the self-pipe */
200
0
    int err = UA_EventLoopPOSIX_pipe(el->selfpipe);
201
0
    if(err != 0) {
202
0
        UA_LOG_SOCKET_ERRNO_WRAP(
203
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
204
0
                          "Eventloop\t| Could not create the self-pipe (%s)",
205
0
                          errno_str));
206
0
        UA_UNLOCK(&el->elMutex);
207
0
        return UA_STATUSCODE_BADINTERNALERROR;
208
0
    }
209
210
    /* Create the epoll socket */
211
0
#ifdef UA_HAVE_EPOLL
212
0
    el->epollfd = epoll_create1(0);
213
0
    if(el->epollfd == -1) {
214
0
        UA_LOG_SOCKET_ERRNO_WRAP(
215
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
216
0
                          "Eventloop\t| Could not create the epoll socket (%s)",
217
0
                          errno_str));
218
0
        UA_close(el->selfpipe[0]);
219
0
        UA_close(el->selfpipe[1]);
220
0
        UA_UNLOCK(&el->elMutex);
221
0
        return UA_STATUSCODE_BADINTERNALERROR;
222
0
    }
223
224
    /* epoll always listens on the self-pipe. This is the only epoll_event that
225
     * has a NULL data pointer. */
226
0
    struct epoll_event event;
227
0
    memset(&event, 0, sizeof(struct epoll_event));
228
0
    event.events = EPOLLIN;
229
0
    err = epoll_ctl(el->epollfd, EPOLL_CTL_ADD, el->selfpipe[0], &event);
230
0
    if(err != 0) {
231
0
        UA_LOG_SOCKET_ERRNO_WRAP(
232
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
233
0
                          "Eventloop\t| Could not register the self-pipe for epoll (%s)",
234
0
                          errno_str));
235
0
        UA_close(el->selfpipe[0]);
236
0
        UA_close(el->selfpipe[1]);
237
0
        close(el->epollfd);
238
0
        UA_UNLOCK(&el->elMutex);
239
0
        return UA_STATUSCODE_BADINTERNALERROR;
240
0
    }
241
0
#endif
242
243
    /* Start the EventSources */
244
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
245
0
    UA_EventSource *es = el->eventLoop.eventSources;
246
0
    while(es) {
247
0
        res |= es->start(es);
248
0
        es = es->next;
249
0
    }
250
251
    /* Dirty-write the state that is const "from the outside" */
252
0
    *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state =
253
0
        UA_EVENTLOOPSTATE_STARTED;
254
255
0
    UA_UNLOCK(&el->elMutex);
256
0
    return res;
257
0
}
258
259
static void
260
0
checkClosed(UA_EventLoopPOSIX *el) {
261
0
    UA_LOCK_ASSERT(&el->elMutex);
262
263
0
    UA_EventSource *es = el->eventLoop.eventSources;
264
0
    while(es) {
265
0
        if(es->state != UA_EVENTSOURCESTATE_STOPPED)
266
0
            return;
267
0
        es = es->next;
268
0
    }
269
270
    /* Not closed until all delayed callbacks are processed */
271
0
    if(el->delayedHead1 != NULL && el->delayedHead2 != NULL)
272
0
        return;
273
274
    /* Close the self-pipe when everything else is done */
275
0
    UA_close(el->selfpipe[0]);
276
0
    UA_close(el->selfpipe[1]);
277
278
    /* Dirty-write the state that is const "from the outside" */
279
0
    *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state =
280
0
        UA_EVENTLOOPSTATE_STOPPED;
281
282
    /* Close the epoll/IOCP socket once all EventSources have shut down */
283
0
#ifdef UA_HAVE_EPOLL
284
0
    UA_close(el->epollfd);
285
0
#endif
286
287
0
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
288
0
                 "The EventLoop has stopped");
289
0
}
290
291
static void
292
0
UA_EventLoopPOSIX_stop(UA_EventLoopPOSIX *el) {
293
0
    UA_LOCK(&el->elMutex);
294
295
0
    if(el->eventLoop.state != UA_EVENTLOOPSTATE_STARTED) {
296
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
297
0
                       "The EventLoop is not running, cannot be stopped");
298
0
        UA_UNLOCK(&el->elMutex);
299
0
        return;
300
0
    }
301
302
0
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
303
0
                 "Stopping the EventLoop");
304
305
    /* Set to STOPPING to prevent "normal use" */
306
0
    *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state =
307
0
        UA_EVENTLOOPSTATE_STOPPING;
308
309
    /* Stop all event sources (asynchronous) */
310
0
    UA_EventSource *es = el->eventLoop.eventSources;
311
0
    for(; es; es = es->next) {
312
0
        if(es->state == UA_EVENTSOURCESTATE_STARTING ||
313
0
           es->state == UA_EVENTSOURCESTATE_STARTED) {
314
0
            es->stop(es);
315
0
        }
316
0
    }
317
318
    /* Set to STOPPED if all EventSources are STOPPED */
319
0
    checkClosed(el);
320
321
0
    UA_UNLOCK(&el->elMutex);
322
0
}
323
324
static UA_StatusCode
325
0
UA_EventLoopPOSIX_run(UA_EventLoopPOSIX *el, UA_UInt32 timeout) {
326
0
    UA_LOCK(&el->elMutex);
327
328
0
    if(el->executing) {
329
0
        UA_LOG_ERROR(el->eventLoop.logger,
330
0
                     UA_LOGCATEGORY_EVENTLOOP,
331
0
                     "Cannot run EventLoop from the run method itself");
332
0
        UA_UNLOCK(&el->elMutex);
333
0
        return UA_STATUSCODE_BADINTERNALERROR;
334
0
    }
335
336
0
    el->executing = true;
337
338
0
    if(el->eventLoop.state == UA_EVENTLOOPSTATE_FRESH ||
339
0
       el->eventLoop.state == UA_EVENTLOOPSTATE_STOPPED) {
340
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
341
0
                       "Cannot run a stopped EventLoop");
342
0
        el->executing = false;
343
0
        UA_UNLOCK(&el->elMutex);
344
0
        return UA_STATUSCODE_BADINTERNALERROR;
345
0
    }
346
347
0
    UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
348
0
                 "Iterate the EventLoop");
349
350
    /* Process cyclic callbacks */
351
0
    UA_DateTime dateBefore =
352
0
        el->eventLoop.dateTime_nowMonotonic(&el->eventLoop);
353
354
0
    UA_DateTime dateNext = UA_Timer_process(&el->timer, dateBefore);
355
356
    /* Process delayed callbacks here:
357
     * - Removes closed sockets already here instead of polling them again.
358
     * - The timeout for polling is selected to be ready in time for the next
359
     *   cyclic callback. So we want to do little work between the timeout
360
     *   running out and executing the due cyclic callbacks. */
361
0
    processDelayed(el);
362
363
    /* A delayed callback could create another delayed callback (or re-add
364
     * itself). In that case we don't want to wait (indefinitely) for an event
365
     * to happen. Process queued events but don't sleep. Then process the
366
     * delayed callbacks in the next iteration. */
367
0
    if(el->delayedHead1 != NULL && el->delayedHead2 != NULL)
368
0
        timeout = 0;
369
370
    /* Compute the remaining time */
371
0
    UA_DateTime maxDate = dateBefore + (timeout * UA_DATETIME_MSEC);
372
0
    if(dateNext > maxDate)
373
0
        dateNext = maxDate;
374
0
    UA_DateTime listenTimeout =
375
0
        dateNext - el->eventLoop.dateTime_nowMonotonic(&el->eventLoop);
376
0
    if(listenTimeout < 0)
377
0
        listenTimeout = 0;
378
379
    /* Listen on the active file-descriptors (sockets) from the
380
     * ConnectionManagers */
381
0
    UA_StatusCode rv = UA_EventLoopPOSIX_pollFDs(el, listenTimeout);
382
383
    /* Check if the last EventSource was successfully stopped */
384
0
    if(el->eventLoop.state == UA_EVENTLOOPSTATE_STOPPING)
385
0
        checkClosed(el);
386
387
0
    el->executing = false;
388
0
    UA_UNLOCK(&el->elMutex);
389
0
    return rv;
390
0
}
391
392
/*****************************/
393
/* Registering Event Sources */
394
/*****************************/
395
396
static UA_StatusCode
397
UA_EventLoopPOSIX_registerEventSource(UA_EventLoopPOSIX *el,
398
0
                                      UA_EventSource *es) {
399
0
    UA_LOCK(&el->elMutex);
400
401
    /* Already registered? */
402
0
    if(es->state != UA_EVENTSOURCESTATE_FRESH) {
403
0
        UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
404
0
                     "Cannot register the EventSource \"%.*s\": "
405
0
                     "already registered",
406
0
                     (int)es->name.length, (char*)es->name.data);
407
0
        UA_UNLOCK(&el->elMutex);
408
0
        return UA_STATUSCODE_BADINTERNALERROR;
409
0
    }
410
411
    /* Add to linked list */
412
0
    es->next = el->eventLoop.eventSources;
413
0
    el->eventLoop.eventSources = es;
414
415
0
    es->eventLoop = &el->eventLoop;
416
0
    es->state = UA_EVENTSOURCESTATE_STOPPED;
417
418
    /* Start if the entire EventLoop is started */
419
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
420
0
    if(el->eventLoop.state == UA_EVENTLOOPSTATE_STARTED)
421
0
        res = es->start(es);
422
423
0
    UA_UNLOCK(&el->elMutex);
424
0
    return res;
425
0
}
426
427
static UA_StatusCode
428
UA_EventLoopPOSIX_deregisterEventSource(UA_EventLoopPOSIX *el,
429
0
                                        UA_EventSource *es) {
430
0
    UA_LOCK(&el->elMutex);
431
432
0
    if(es->state != UA_EVENTSOURCESTATE_STOPPED) {
433
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
434
0
                       "Cannot deregister the EventSource %.*s: "
435
0
                       "Has to be stopped first",
436
0
                       (int)es->name.length, es->name.data);
437
0
        UA_UNLOCK(&el->elMutex);
438
0
        return UA_STATUSCODE_BADINTERNALERROR;
439
0
    }
440
441
    /* Remove from the linked list */
442
0
    UA_EventSource **s = &el->eventLoop.eventSources;
443
0
    while(*s) {
444
0
        if(*s == es) {
445
0
            *s = es->next;
446
0
            break;
447
0
        }
448
0
        s = &(*s)->next;
449
0
    }
450
451
    /* Set the state to non-registered */
452
0
    es->state = UA_EVENTSOURCESTATE_FRESH;
453
454
0
    UA_UNLOCK(&el->elMutex);
455
0
    return UA_STATUSCODE_GOOD;
456
0
}
457
458
/***************/
459
/* Time Domain */
460
/***************/
461
462
static UA_DateTime
463
0
UA_EventLoopPOSIX_DateTime_now(UA_EventLoop *el) {
464
0
#if defined(UA_ARCHITECTURE_POSIX)
465
0
    UA_EventLoopPOSIX *pel = (UA_EventLoopPOSIX*)el;
466
0
    struct timespec ts;
467
0
    int res = clock_gettime(pel->clockSource, &ts);
468
0
    if(UA_UNLIKELY(res != 0))
469
0
        return 0;
470
0
    return (ts.tv_sec * UA_DATETIME_SEC) + (ts.tv_nsec / 100) + UA_DATETIME_UNIX_EPOCH;
471
#else
472
    return UA_DateTime_now();
473
#endif
474
0
}
475
476
static UA_DateTime
477
0
UA_EventLoopPOSIX_DateTime_nowMonotonic(UA_EventLoop *el) {
478
0
#if defined(UA_ARCHITECTURE_POSIX)
479
0
    UA_EventLoopPOSIX *pel = (UA_EventLoopPOSIX*)el;
480
0
    struct timespec ts;
481
0
    int res = clock_gettime(pel->clockSourceMonotonic, &ts);
482
0
    if(UA_UNLIKELY(res != 0))
483
0
        return 0;
484
    /* Also add the unix epoch for the monotonic clock. So we get a "normal"
485
     * output when a "normal" source is configured. */
486
0
    return (ts.tv_sec * UA_DATETIME_SEC) + (ts.tv_nsec / 100) + UA_DATETIME_UNIX_EPOCH;
487
#else
488
    return UA_DateTime_nowMonotonic();
489
#endif
490
0
}
491
492
static UA_Int64
493
0
UA_EventLoopPOSIX_DateTime_localTimeUtcOffset(UA_EventLoop *el) {
494
    /* TODO: Fix for custom clock sources */
495
0
    return UA_DateTime_localTimeUtcOffset();
496
0
}
497
498
/*************************/
499
/* Initialize and Delete */
500
/*************************/
501
502
static UA_StatusCode
503
0
UA_EventLoopPOSIX_free(UA_EventLoopPOSIX *el) {
504
0
    UA_LOCK(&el->elMutex);
505
506
    /* Check if the EventLoop can be deleted */
507
0
    if(el->eventLoop.state != UA_EVENTLOOPSTATE_STOPPED &&
508
0
       el->eventLoop.state != UA_EVENTLOOPSTATE_FRESH) {
509
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
510
0
                       "Cannot delete a running EventLoop");
511
0
        UA_UNLOCK(&el->elMutex);
512
0
        return UA_STATUSCODE_BADINTERNALERROR;
513
0
    }
514
515
    /* Deregister and delete all the EventSources */
516
0
    while(el->eventLoop.eventSources) {
517
0
        UA_EventSource *es = el->eventLoop.eventSources;
518
0
        UA_EventLoopPOSIX_deregisterEventSource(el, es);
519
0
        es->free(es);
520
0
    }
521
522
    /* Remove the repeated timed callbacks */
523
0
    UA_Timer_clear(&el->timer);
524
525
    /* Process remaining delayed callbacks */
526
0
    processDelayed(el);
527
528
#ifdef UA_ARCHITECTURE_WIN32
529
    /* Stop the Windows networking subsystem */
530
    WSACleanup();
531
#endif
532
533
0
    UA_KeyValueMap_clear(&el->eventLoop.params);
534
535
    /* Clean up */
536
0
    UA_UNLOCK(&el->elMutex);
537
0
    UA_LOCK_DESTROY(&el->elMutex);
538
0
    UA_free(el);
539
0
    return UA_STATUSCODE_GOOD;
540
0
}
541
542
static void
543
0
UA_EventLoopPOSIX_lock(UA_EventLoop *public_el) {
544
0
    UA_LOCK(&((UA_EventLoopPOSIX*)public_el)->elMutex);
545
0
}
546
static void
547
0
UA_EventLoopPOSIX_unlock(UA_EventLoop *public_el) {
548
0
    UA_UNLOCK(&((UA_EventLoopPOSIX*)public_el)->elMutex);
549
0
}
550
551
UA_EventLoop *
552
0
UA_EventLoop_new_POSIX(const UA_Logger *logger) {
553
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)
554
0
        UA_calloc(1, sizeof(UA_EventLoopPOSIX));
555
0
    if(!el)
556
0
        return NULL;
557
558
0
    UA_LOCK_INIT(&el->elMutex);
559
0
    UA_Timer_init(&el->timer);
560
561
    /* Initialize the queue */
562
0
    el->delayedTail = &el->delayedHead1;
563
0
    el->delayedHead2 = (UA_DelayedCallback*)0x01; /* sentinel value */
564
565
#ifdef UA_ARCHITECTURE_WIN32
566
    /* Start the WSA networking subsystem on Windows */
567
    WSADATA wsaData;
568
    WSAStartup(MAKEWORD(2, 2), &wsaData);
569
#endif
570
571
    /* Set the public EventLoop content */
572
0
    el->eventLoop.logger = logger;
573
574
    /* Initialize the clock source to the default */
575
0
#if defined(UA_ARCHITECTURE_POSIX)
576
0
    el->clockSource = CLOCK_REALTIME;
577
0
# ifdef CLOCK_MONOTONIC_RAW
578
0
    el->clockSourceMonotonic = CLOCK_MONOTONIC_RAW;
579
# else
580
    el->clockSourceMonotonic = CLOCK_MONOTONIC;
581
# endif
582
0
#endif
583
584
    /* Set the method pointers for the interface */
585
0
    el->eventLoop.start = (UA_StatusCode (*)(UA_EventLoop*))UA_EventLoopPOSIX_start;
586
0
    el->eventLoop.stop = (void (*)(UA_EventLoop*))UA_EventLoopPOSIX_stop;
587
0
    el->eventLoop.free = (UA_StatusCode (*)(UA_EventLoop*))UA_EventLoopPOSIX_free;
588
0
    el->eventLoop.run = (UA_StatusCode (*)(UA_EventLoop*, UA_UInt32))UA_EventLoopPOSIX_run;
589
0
    el->eventLoop.cancel = (void (*)(UA_EventLoop*))UA_EventLoopPOSIX_cancel;
590
591
0
    el->eventLoop.dateTime_now = UA_EventLoopPOSIX_DateTime_now;
592
0
    el->eventLoop.dateTime_nowMonotonic =
593
0
        UA_EventLoopPOSIX_DateTime_nowMonotonic;
594
0
    el->eventLoop.dateTime_localTimeUtcOffset =
595
0
        UA_EventLoopPOSIX_DateTime_localTimeUtcOffset;
596
597
0
    el->eventLoop.nextTimer = UA_EventLoopPOSIX_nextTimer;
598
0
    el->eventLoop.addTimer = UA_EventLoopPOSIX_addTimer;
599
0
    el->eventLoop.modifyTimer = UA_EventLoopPOSIX_modifyTimer;
600
0
    el->eventLoop.removeTimer = UA_EventLoopPOSIX_removeTimer;
601
0
    el->eventLoop.addDelayedCallback = UA_EventLoopPOSIX_addDelayedCallback;
602
0
    el->eventLoop.removeDelayedCallback = UA_EventLoopPOSIX_removeDelayedCallback;
603
604
0
    el->eventLoop.registerEventSource =
605
0
        (UA_StatusCode (*)(UA_EventLoop*, UA_EventSource*))
606
0
        UA_EventLoopPOSIX_registerEventSource;
607
0
    el->eventLoop.deregisterEventSource =
608
0
        (UA_StatusCode (*)(UA_EventLoop*, UA_EventSource*))
609
0
        UA_EventLoopPOSIX_deregisterEventSource;
610
611
0
    el->eventLoop.lock = UA_EventLoopPOSIX_lock;
612
0
    el->eventLoop.unlock = UA_EventLoopPOSIX_unlock;
613
614
0
    return &el->eventLoop;
615
0
}
616
617
/***************************/
618
/* Network Buffer Handling */
619
/***************************/
620
621
UA_StatusCode
622
UA_EventLoopPOSIX_allocNetworkBuffer(UA_ConnectionManager *cm,
623
                                     uintptr_t connectionId,
624
                                     UA_ByteString *buf,
625
0
                                     size_t bufSize) {
626
0
    UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm;
627
0
    if(pcm->txBuffer.length == 0)
628
0
        return UA_ByteString_allocBuffer(buf, bufSize);
629
0
    if(pcm->txBuffer.length < bufSize)
630
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
631
0
    *buf = pcm->txBuffer;
632
0
    buf->length = bufSize;
633
0
    return UA_STATUSCODE_GOOD;
634
0
}
635
636
void
637
UA_EventLoopPOSIX_freeNetworkBuffer(UA_ConnectionManager *cm,
638
                                    uintptr_t connectionId,
639
0
                                    UA_ByteString *buf) {
640
0
    UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm;
641
0
    if(pcm->txBuffer.data == buf->data)
642
0
        UA_ByteString_init(buf);
643
0
    else
644
0
        UA_ByteString_clear(buf);
645
0
}
646
647
UA_StatusCode
648
0
UA_EventLoopPOSIX_allocateStaticBuffers(UA_POSIXConnectionManager *pcm) {
649
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
650
0
    UA_UInt32 rxBufSize = 2u << 16; /* The default is 64kb */
651
0
    const UA_UInt32 *configRxBufSize = (const UA_UInt32 *)
652
0
        UA_KeyValueMap_getScalar(&pcm->cm.eventSource.params,
653
0
                                 UA_QUALIFIEDNAME(0, "recv-bufsize"),
654
0
                                 &UA_TYPES[UA_TYPES_UINT32]);
655
0
    if(configRxBufSize)
656
0
        rxBufSize = *configRxBufSize;
657
0
    if(pcm->rxBuffer.length != rxBufSize) {
658
0
        UA_ByteString_clear(&pcm->rxBuffer);
659
0
        res = UA_ByteString_allocBuffer(&pcm->rxBuffer, rxBufSize);
660
0
    }
661
662
0
    const UA_UInt32 *txBufSize = (const UA_UInt32 *)
663
0
        UA_KeyValueMap_getScalar(&pcm->cm.eventSource.params,
664
0
                                 UA_QUALIFIEDNAME(0, "send-bufsize"),
665
0
                                 &UA_TYPES[UA_TYPES_UINT32]);
666
0
    if(txBufSize && pcm->txBuffer.length != *txBufSize) {
667
0
        UA_ByteString_clear(&pcm->txBuffer);
668
0
        res |= UA_ByteString_allocBuffer(&pcm->txBuffer, *txBufSize);
669
0
    }
670
0
    return res;
671
0
}
672
673
/******************/
674
/* Socket Options */
675
/******************/
676
677
enum ZIP_CMP
678
0
cmpFD(const UA_FD *a, const UA_FD *b) {
679
0
    if(*a == *b)
680
0
        return ZIP_CMP_EQ;
681
0
    return (*a < *b) ? ZIP_CMP_LESS : ZIP_CMP_MORE;
682
0
}
683
684
UA_StatusCode
685
0
UA_EventLoopPOSIX_setNonBlocking(UA_FD sockfd) {
686
0
#ifndef UA_ARCHITECTURE_WIN32
687
0
    int opts = fcntl(sockfd, F_GETFL);
688
0
    if(opts < 0 || fcntl(sockfd, F_SETFL, opts | O_NONBLOCK) < 0)
689
0
        return UA_STATUSCODE_BADINTERNALERROR;
690
#else
691
    u_long iMode = 1;
692
    if(ioctlsocket(sockfd, FIONBIO, &iMode) != NO_ERROR)
693
        return UA_STATUSCODE_BADINTERNALERROR;
694
#endif
695
0
    return UA_STATUSCODE_GOOD;
696
0
}
697
698
UA_StatusCode
699
0
UA_EventLoopPOSIX_setNoSigPipe(UA_FD sockfd) {
700
#ifdef SO_NOSIGPIPE
701
    int val = 1;
702
    int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val));
703
    if(res < 0)
704
        return UA_STATUSCODE_BADINTERNALERROR;
705
#endif
706
0
    return UA_STATUSCODE_GOOD;
707
0
}
708
709
UA_StatusCode
710
0
UA_EventLoopPOSIX_setReusable(UA_FD sockfd) {
711
0
    int enableReuseVal = 1;
712
0
#ifndef UA_ARCHITECTURE_WIN32
713
0
    int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
714
0
                            (const char*)&enableReuseVal, sizeof(enableReuseVal));
715
0
    res |= UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT,
716
0
                            (const char*)&enableReuseVal, sizeof(enableReuseVal));
717
0
    return (res == 0) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADINTERNALERROR;
718
#else
719
    int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
720
                            (const char*)&enableReuseVal, sizeof(enableReuseVal));
721
    return (res == 0) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADINTERNALERROR;
722
#endif
723
0
}
724
725
/************************/
726
/* Select / epoll Logic */
727
/************************/
728
729
/* Re-arm the self-pipe socket for the next signal by reading from it */
730
static void
731
0
flushSelfPipe(UA_SOCKET s) {
732
0
    char buf[128];
733
#ifdef UA_ARCHITECTURE_WIN32
734
    recv(s, buf, 128, 0);
735
#else
736
0
    ssize_t i;
737
0
    do {
738
0
        i = read(s, buf, 128);
739
0
    } while(i > 0);
740
0
#endif
741
0
}
742
743
#if !defined(UA_HAVE_EPOLL)
744
745
UA_StatusCode
746
UA_EventLoopPOSIX_registerFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
747
    UA_LOCK_ASSERT(&el->elMutex);
748
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
749
                 "Registering fd: %u", (unsigned)rfd->fd);
750
751
    /* Realloc */
752
    UA_RegisteredFD **fds_tmp = (UA_RegisteredFD**)
753
        UA_realloc(el->fds, sizeof(UA_RegisteredFD*) * (el->fdsSize + 1));
754
    if(!fds_tmp) {
755
        return UA_STATUSCODE_BADOUTOFMEMORY;
756
    }
757
    el->fds = fds_tmp;
758
759
    /* Add to the last entry */
760
    el->fds[el->fdsSize] = rfd;
761
    el->fdsSize++;
762
    return UA_STATUSCODE_GOOD;
763
}
764
765
UA_StatusCode
766
UA_EventLoopPOSIX_modifyFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
767
    /* Do nothing, it is enough if the data was changed in the rfd */
768
    UA_LOCK_ASSERT(&el->elMutex);
769
    return UA_STATUSCODE_GOOD;
770
}
771
772
void
773
UA_EventLoopPOSIX_deregisterFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
774
    UA_LOCK_ASSERT(&el->elMutex);
775
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
776
                 "Unregistering fd: %u", (unsigned)rfd->fd);
777
778
    /* Find the entry */
779
    size_t i = 0;
780
    for(; i < el->fdsSize; i++) {
781
        if(el->fds[i] == rfd)
782
            break;
783
    }
784
785
    /* Not found? */
786
    if(i == el->fdsSize)
787
        return;
788
789
    if(el->fdsSize > 1) {
790
        /* Move the last entry in the ith slot and realloc. */
791
        el->fdsSize--;
792
        el->fds[i] = el->fds[el->fdsSize];
793
        UA_RegisteredFD **fds_tmp = (UA_RegisteredFD**)
794
            UA_realloc(el->fds, sizeof(UA_RegisteredFD*) * el->fdsSize);
795
        /* if realloc fails the fds are still in a correct state with
796
         * possibly lost memory, so failing silently here is ok */
797
        if(fds_tmp)
798
            el->fds = fds_tmp;
799
    } else {
800
        /* Remove the last entry */
801
        UA_free(el->fds);
802
        el->fds = NULL;
803
        el->fdsSize = 0;
804
    }
805
}
806
807
static UA_FD
808
setFDSets(UA_EventLoopPOSIX *el, fd_set *readset, fd_set *writeset, fd_set *errset) {
809
    UA_LOCK_ASSERT(&el->elMutex);
810
811
    FD_ZERO(readset);
812
    FD_ZERO(writeset);
813
    FD_ZERO(errset);
814
815
    /* Always listen on the read-end of the pipe */
816
    UA_FD highestfd = el->selfpipe[0];
817
    FD_SET(el->selfpipe[0], readset);
818
819
    for(size_t i = 0; i < el->fdsSize; i++) {
820
        UA_FD currentFD = el->fds[i]->fd;
821
822
        /* Add to the fd_sets */
823
        if(el->fds[i]->listenEvents & UA_FDEVENT_IN)
824
            FD_SET(currentFD, readset);
825
        if(el->fds[i]->listenEvents & UA_FDEVENT_OUT)
826
            FD_SET(currentFD, writeset);
827
828
        /* Always return errors */
829
        FD_SET(currentFD, errset);
830
831
        /* Highest fd? */
832
        if(currentFD > highestfd)
833
            highestfd = currentFD;
834
    }
835
    return highestfd;
836
}
837
838
UA_StatusCode
839
UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
840
    UA_assert(listenTimeout >= 0);
841
    UA_LOCK_ASSERT(&el->elMutex);
842
843
    fd_set readset, writeset, errset;
844
    UA_FD highestfd = setFDSets(el, &readset, &writeset, &errset);
845
846
    /* Nothing to do? */
847
    if(highestfd == UA_INVALID_FD) {
848
        UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
849
                     "No valid FDs for processing");
850
        return UA_STATUSCODE_GOOD;
851
    }
852
853
    struct timeval tmptv = {
854
#ifndef UA_ARCHITECTURE_WIN32
855
        (time_t)(listenTimeout / UA_DATETIME_SEC),
856
        (suseconds_t)((listenTimeout % UA_DATETIME_SEC) / UA_DATETIME_USEC)
857
#else
858
        (long)(listenTimeout / UA_DATETIME_SEC),
859
        (long)((listenTimeout % UA_DATETIME_SEC) / UA_DATETIME_USEC)
860
#endif
861
    };
862
863
    UA_UNLOCK(&el->elMutex);
864
    int selectStatus = UA_select(highestfd+1, &readset, &writeset, &errset, &tmptv);
865
    UA_LOCK(&el->elMutex);
866
    if(selectStatus < 0) {
867
        /* We will retry, only log the error */
868
        UA_LOG_SOCKET_ERRNO_WRAP(
869
            UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
870
                           "Error during select: %s", errno_str));
871
        return UA_STATUSCODE_GOOD;
872
    }
873
874
    /* The self-pipe has received. Clear the buffer by reading. */
875
    if(UA_UNLIKELY(FD_ISSET(el->selfpipe[0], &readset)))
876
        flushSelfPipe(el->selfpipe[0]);
877
878
    /* Loop over all registered FD to see if an event arrived. Yes, this is why
879
     * select is slow for many open sockets. */
880
    for(size_t i = 0; i < el->fdsSize; i++) {
881
        UA_RegisteredFD *rfd = el->fds[i];
882
883
        /* The rfd is already registered for removal. Don't process incoming
884
         * events any longer. */
885
        if(rfd->dc.callback)
886
            continue;
887
888
        /* Event signaled for the fd? */
889
        short event = 0;
890
        if(FD_ISSET(rfd->fd, &readset)) {
891
            event = UA_FDEVENT_IN;
892
        } else if(FD_ISSET(rfd->fd, &writeset)) {
893
            event = UA_FDEVENT_OUT;
894
        } else if(FD_ISSET(rfd->fd, &errset)) {
895
            event = UA_FDEVENT_ERR;
896
        } else {
897
            continue;
898
        }
899
900
        UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
901
                     "Processing event %u on fd %u", (unsigned)event,
902
                     (unsigned)rfd->fd);
903
904
        /* Call the EventSource callback */
905
        rfd->eventSourceCB(rfd->es, rfd, event);
906
907
        /* The fd has removed itself */
908
        if(i == el->fdsSize || rfd != el->fds[i])
909
            i--;
910
    }
911
    return UA_STATUSCODE_GOOD;
912
}
913
914
#else /* defined(UA_HAVE_EPOLL) */
915
916
UA_StatusCode
917
0
UA_EventLoopPOSIX_registerFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
918
0
    struct epoll_event event;
919
0
    memset(&event, 0, sizeof(struct epoll_event));
920
0
    event.data.ptr = rfd;
921
0
    event.events = 0;
922
0
    if(rfd->listenEvents & UA_FDEVENT_IN)
923
0
        event.events |= EPOLLIN;
924
0
    if(rfd->listenEvents & UA_FDEVENT_OUT)
925
0
        event.events |= EPOLLOUT;
926
927
0
    int err = epoll_ctl(el->epollfd, EPOLL_CTL_ADD, rfd->fd, &event);
928
0
    if(err != 0) {
929
0
        UA_LOG_SOCKET_ERRNO_WRAP(
930
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
931
0
                          "TCP %u\t| Could not register for epoll (%s)",
932
0
                          rfd->fd, errno_str));
933
0
        return UA_STATUSCODE_BADINTERNALERROR;
934
0
    }
935
0
    return UA_STATUSCODE_GOOD;
936
0
}
937
938
UA_StatusCode
939
0
UA_EventLoopPOSIX_modifyFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
940
0
    struct epoll_event event;
941
0
    memset(&event, 0, sizeof(struct epoll_event));
942
0
    event.data.ptr = rfd;
943
0
    event.events = 0;
944
0
    if(rfd->listenEvents & UA_FDEVENT_IN)
945
0
        event.events |= EPOLLIN;
946
0
    if(rfd->listenEvents & UA_FDEVENT_OUT)
947
0
        event.events |= EPOLLOUT;
948
949
0
    int err = epoll_ctl(el->epollfd, EPOLL_CTL_MOD, rfd->fd, &event);
950
0
    if(err != 0) {
951
0
        UA_LOG_SOCKET_ERRNO_WRAP(
952
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
953
0
                          "TCP %u\t| Could not modify for epoll (%s)",
954
0
                          rfd->fd, errno_str));
955
0
        return UA_STATUSCODE_BADINTERNALERROR;
956
0
    }
957
0
    return UA_STATUSCODE_GOOD;
958
0
}
959
960
void
961
0
UA_EventLoopPOSIX_deregisterFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
962
0
    int res = epoll_ctl(el->epollfd, EPOLL_CTL_DEL, rfd->fd, NULL);
963
0
    if(res != 0) {
964
0
        UA_LOG_SOCKET_ERRNO_WRAP(
965
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
966
0
                          "TCP %u\t| Could not deregister from epoll (%s)",
967
0
                          rfd->fd, errno_str));
968
0
    }
969
0
}
970
971
UA_StatusCode
972
0
UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
973
0
    UA_assert(listenTimeout >= 0);
974
975
    /* If there is a positive timeout, wait at least one millisecond, the
976
     * minimum for blocking epoll_wait. This prevents a busy-loop, as the
977
     * open62541 library allows even smaller timeouts, which can result in a
978
     * zero timeout due to rounding to an integer here. */
979
0
    int timeout = (int)(listenTimeout / UA_DATETIME_MSEC);
980
0
    if(timeout == 0 && listenTimeout > 0)
981
0
        timeout = 1;
982
983
    /* Poll the registered sockets */
984
0
    struct epoll_event epoll_events[64];
985
0
    UA_UNLOCK(&el->elMutex);
986
0
    int events = epoll_wait(el->epollfd, epoll_events, 64, timeout);
987
0
    UA_LOCK(&el->elMutex);
988
989
    /* TODO: Replace with pwait2 for higher-precision timeouts once this is
990
     * available in the standard library.
991
     *
992
     * struct timespec precisionTimeout = {
993
     *  (long)(listenTimeout / UA_DATETIME_SEC),
994
     *   (long)((listenTimeout % UA_DATETIME_SEC) * 100)
995
     * };
996
     * int events = epoll_pwait2(epollfd, epoll_events, 64,
997
     *                        precisionTimeout, NULL); */
998
999
    /* Handle error conditions */
1000
0
    if(events == -1) {
1001
0
        if(errno == EINTR) {
1002
            /* We will retry, only log the error */
1003
0
            UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
1004
0
                         "Timeout during poll");
1005
0
            return UA_STATUSCODE_GOOD;
1006
0
        }
1007
0
        UA_LOG_SOCKET_ERRNO_WRAP(
1008
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
1009
0
                          "TCP\t| Error %s, closing the server socket",
1010
0
                          errno_str));
1011
0
        return UA_STATUSCODE_BADINTERNALERROR;
1012
0
    }
1013
1014
    /* Process all received events */
1015
0
    for(int i = 0; i < events; i++) {
1016
0
        UA_RegisteredFD *rfd = (UA_RegisteredFD*)epoll_events[i].data.ptr;
1017
1018
        /* The self-pipe has received */
1019
0
        if(!rfd) {
1020
0
            flushSelfPipe(el->selfpipe[0]);
1021
0
            continue;
1022
0
        }
1023
1024
        /* The rfd is already registered for removal. Don't process incoming
1025
         * events any longer. */
1026
0
        if(rfd->dc.callback)
1027
0
            continue;
1028
1029
        /* Get the event */
1030
0
        short revent = 0;
1031
0
        if((epoll_events[i].events & EPOLLIN) == EPOLLIN) {
1032
0
            revent = UA_FDEVENT_IN;
1033
0
        } else if((epoll_events[i].events & EPOLLOUT) == EPOLLOUT) {
1034
0
            revent = UA_FDEVENT_OUT;
1035
0
        } else {
1036
0
            revent = UA_FDEVENT_ERR;
1037
0
        }
1038
1039
        /* Call the EventSource callback */
1040
0
        rfd->eventSourceCB(rfd->es, rfd, revent);
1041
0
    }
1042
0
    return UA_STATUSCODE_GOOD;
1043
0
}
1044
1045
#endif /* defined(UA_HAVE_EPOLL) */
1046
1047
#if defined(UA_ARCHITECTURE_WIN32) || defined(__APPLE__)
1048
int UA_EventLoopPOSIX_pipe(SOCKET fds[2]) {
1049
    struct sockaddr_in inaddr;
1050
    memset(&inaddr, 0, sizeof(inaddr));
1051
    inaddr.sin_family = AF_INET;
1052
    inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1053
    inaddr.sin_port = 0;
1054
1055
    SOCKET lst = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1056
    bind(lst, (struct sockaddr *)&inaddr, sizeof(inaddr));
1057
    listen(lst, 1);
1058
1059
    struct sockaddr_storage addr;
1060
    memset(&addr, 0, sizeof(addr));
1061
    int len = sizeof(addr);
1062
    getsockname(lst, (struct sockaddr*)&addr, &len);
1063
1064
    fds[0] = socket(AF_INET, SOCK_STREAM, 0);
1065
    int err = connect(fds[0], (struct sockaddr*)&addr, len);
1066
    fds[1] = accept(lst, 0, 0);
1067
#ifdef UA_ARCHITECTURE_WIN32
1068
    closesocket(lst);
1069
#endif
1070
#ifdef __APPLE__
1071
    close(lst);
1072
#endif
1073
1074
    UA_EventLoopPOSIX_setNoSigPipe(fds[0]);
1075
    UA_EventLoopPOSIX_setReusable(fds[0]);
1076
    UA_EventLoopPOSIX_setNonBlocking(fds[0]);
1077
    UA_EventLoopPOSIX_setNoSigPipe(fds[1]);
1078
    UA_EventLoopPOSIX_setReusable(fds[1]);
1079
    UA_EventLoopPOSIX_setNonBlocking(fds[1]);
1080
    return err;
1081
}
1082
#elif defined(__QNX__)
1083
int UA_EventLoopPOSIX_pipe(int fds[2]) {
1084
    int err = pipe(fds); 
1085
    if(err == -1) {
1086
      return err;
1087
    }
1088
1089
    err = fcntl(fds[0], F_SETFL, O_NONBLOCK);
1090
    if(err == -1) {
1091
      return err;
1092
    }
1093
    return err;
1094
}
1095
#endif
1096
1097
void
1098
0
UA_EventLoopPOSIX_cancel(UA_EventLoopPOSIX *el) {
1099
    /* Nothing to do if the EventLoop is not executing */
1100
0
    if(!el->executing)
1101
0
        return;
1102
1103
    /* Trigger the self-pipe */
1104
#ifdef UA_ARCHITECTURE_WIN32
1105
    int err = send(el->selfpipe[1], ".", 1, 0);
1106
#else
1107
0
    ssize_t err = write(el->selfpipe[1], ".", 1);
1108
0
#endif
1109
0
    if(err <= 0) {
1110
        UA_LOG_SOCKET_ERRNO_WRAP(
1111
0
            UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
1112
0
                           "Eventloop\t| Error signaling self-pipe (%s)", errno_str));
1113
0
    }
1114
0
}
1115
1116
#endif