Coverage Report

Created: 2025-07-12 06:53

/src/open62541/arch/posix/eventloop_posix.c
Line
Count
Source (jump to first uncovered line)
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
 *
5
 *    Copyright 2021 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
6
 *    Copyright 2021 (c) Fraunhofer IOSB (Author: Jan Hermes)
7
 */
8
9
#include "eventloop_posix.h"
10
#include "open62541/plugin/eventloop.h"
11
12
#if defined(UA_ARCHITECTURE_POSIX) && !defined(UA_ARCHITECTURE_LWIP) || defined(UA_ARCHITECTURE_WIN32)
13
14
#if defined(UA_ARCHITECTURE_POSIX) && !defined(__APPLE__) && !defined(__MACH__)
15
#include <time.h>
16
#endif
17
18
/*********/
19
/* Timer */
20
/*********/
21
22
static UA_DateTime
23
0
UA_EventLoopPOSIX_nextTimer(UA_EventLoop *public_el) {
24
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
25
0
    if(el->delayedHead1 > (UA_DelayedCallback *)0x01 ||
26
0
       el->delayedHead2 > (UA_DelayedCallback *)0x01)
27
0
        return el->eventLoop.dateTime_nowMonotonic(&el->eventLoop);
28
0
    return UA_Timer_next(&el->timer);
29
0
}
30
31
static UA_StatusCode
32
UA_EventLoopPOSIX_addTimer(UA_EventLoop *public_el, UA_Callback cb,
33
                           void *application, void *data, UA_Double interval_ms,
34
                           UA_DateTime *baseTime, UA_TimerPolicy timerPolicy,
35
0
                           UA_UInt64 *callbackId) {
36
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
37
0
    return UA_Timer_add(&el->timer, cb, application, data, interval_ms,
38
0
                        public_el->dateTime_nowMonotonic(public_el),
39
0
                        baseTime, timerPolicy, callbackId);
40
0
}
41
42
static UA_StatusCode
43
UA_EventLoopPOSIX_modifyTimer(UA_EventLoop *public_el,
44
                              UA_UInt64 callbackId,
45
                              UA_Double interval_ms,
46
                              UA_DateTime *baseTime,
47
0
                              UA_TimerPolicy timerPolicy) {
48
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
49
0
    return UA_Timer_modify(&el->timer, callbackId, interval_ms,
50
0
                           public_el->dateTime_nowMonotonic(public_el),
51
0
                           baseTime, timerPolicy);
52
0
}
53
54
static void
55
UA_EventLoopPOSIX_removeTimer(UA_EventLoop *public_el,
56
0
                              UA_UInt64 callbackId) {
57
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
58
0
    UA_Timer_remove(&el->timer, callbackId);
59
0
}
60
61
void
62
UA_EventLoopPOSIX_addDelayedCallback(UA_EventLoop *public_el,
63
0
                                     UA_DelayedCallback *dc) {
64
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
65
0
    dc->next = NULL;
66
67
    /* el->delayedTail points either to prev->next or to the head.
68
     * We need to update two locations:
69
     * 1: el->delayedTail = &dc->next;
70
     * 2: *oldtail = dc; (equal to &dc->next)
71
     *
72
     * Once we have (1), we "own" the previous-to-last entry. No need to worry
73
     * about (2), we can adjust it with a delay. This makes the queue
74
     * "eventually consistent". */
75
0
    UA_DelayedCallback **oldtail = (UA_DelayedCallback**)
76
0
        UA_atomic_xchg((void**)&el->delayedTail, &dc->next);
77
0
    UA_atomic_xchg((void**)oldtail, &dc->next);
78
0
}
79
80
/* Resets the delayed queue and returns the previous head and tail */
81
static void
82
resetDelayedQueue(UA_EventLoopPOSIX *el, UA_DelayedCallback **oldHead,
83
0
                  UA_DelayedCallback **oldTail) {
84
0
    if(el->delayedHead1 <= (UA_DelayedCallback *)0x01 &&
85
0
       el->delayedHead2 <= (UA_DelayedCallback *)0x01)
86
0
        return; /* The queue is empty */
87
88
0
    UA_Boolean active1 = (el->delayedHead1 != (UA_DelayedCallback*)0x01);
89
0
    UA_DelayedCallback **activeHead = (active1) ? &el->delayedHead1 : &el->delayedHead2;
90
0
    UA_DelayedCallback **inactiveHead = (active1) ? &el->delayedHead2 : &el->delayedHead1;
91
92
    /* Switch active/inactive by resetting the sentinel values. The (old) active
93
     * head points to an element which we return. Parallel threads continue to
94
     * add elements to the queue "below" the first element. */
95
0
    UA_atomic_xchg((void**)inactiveHead, NULL);
96
0
    *oldHead = (UA_DelayedCallback *)
97
0
        UA_atomic_xchg((void**)activeHead, (void*)0x01);
98
99
    /* Make the tail point to the (new) active head. Return the value of last
100
     * tail. When iterating over the queue elements, we need to find this tail
101
     * as the last element. If we find a NULL next-pointer before hitting the
102
     * tail spinlock until the pointer updates (eventually consistent). */
103
0
    *oldTail = (UA_DelayedCallback*)
104
0
        UA_atomic_xchg((void**)&el->delayedTail, inactiveHead);
105
0
}
106
107
static void
108
UA_EventLoopPOSIX_removeDelayedCallback(UA_EventLoop *public_el,
109
0
                                        UA_DelayedCallback *dc) {
110
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
111
0
    UA_LOCK(&el->elMutex);
112
113
    /* Reset and get the old head and tail */
114
0
    UA_DelayedCallback *cur = NULL, *tail = NULL;
115
0
    resetDelayedQueue(el, &cur, &tail);
116
117
    /* Loop until we reach the tail (or head and tail are both NULL) */
118
0
    UA_DelayedCallback *next;
119
0
    for(; cur; cur = next) {
120
        /* Spin-loop until the next-pointer of cur is updated.
121
         * The element pointed to by tail must appear eventually. */
122
0
        next = cur->next;
123
0
        while(!next && cur != tail)
124
0
            next = (UA_DelayedCallback *)UA_atomic_load((void**)&cur->next);
125
0
        if(cur == dc)
126
0
            continue;
127
0
        UA_EventLoopPOSIX_addDelayedCallback(public_el, cur);
128
0
    }
129
130
0
    UA_UNLOCK(&el->elMutex);
131
0
}
132
133
static void
134
0
processDelayed(UA_EventLoopPOSIX *el) {
135
0
    UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
136
0
                 "Process delayed callbacks");
137
138
0
    UA_LOCK_ASSERT(&el->elMutex);
139
140
    /* Reset and get the old head and tail */
141
0
    UA_DelayedCallback *dc = NULL, *tail = NULL;
142
0
    resetDelayedQueue(el, &dc, &tail);
143
144
    /* Loop until we reach the tail (or head and tail are both NULL) */
145
0
    UA_DelayedCallback *next;
146
0
    for(; dc; dc = next) {
147
0
        next = dc->next;
148
0
        while(!next && dc != tail)
149
0
            next = (UA_DelayedCallback *)UA_atomic_load((void**)&dc->next);
150
0
        if(!dc->callback)
151
0
            continue;
152
0
        dc->callback(dc->application, dc->context);
153
0
    }
154
0
}
155
156
/***********************/
157
/* EventLoop Lifecycle */
158
/***********************/
159
160
static UA_StatusCode
161
0
UA_EventLoopPOSIX_start(UA_EventLoopPOSIX *el) {
162
0
    UA_LOCK(&el->elMutex);
163
164
0
    if(el->eventLoop.state != UA_EVENTLOOPSTATE_FRESH &&
165
0
       el->eventLoop.state != UA_EVENTLOOPSTATE_STOPPED) {
166
0
        UA_UNLOCK(&el->elMutex);
167
0
        return UA_STATUSCODE_BADINTERNALERROR;
168
0
    }
169
170
0
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
171
0
                 "Starting the EventLoop");
172
173
    /* Setting the clock source */
174
0
    const UA_Int32 *cs = (const UA_Int32*)
175
0
        UA_KeyValueMap_getScalar(&el->eventLoop.params,
176
0
                                 UA_QUALIFIEDNAME(0, "clock-source"),
177
0
                                 &UA_TYPES[UA_TYPES_INT32]);
178
0
    const UA_Int32 *csm = (const UA_Int32*)
179
0
        UA_KeyValueMap_getScalar(&el->eventLoop.params,
180
0
                                 UA_QUALIFIEDNAME(0, "clock-source-monotonic"),
181
0
                                 &UA_TYPES[UA_TYPES_INT32]);
182
0
#if defined(UA_ARCHITECTURE_POSIX) && !defined(__APPLE__) && !defined(__MACH__)
183
0
    el->clockSource = CLOCK_REALTIME;
184
0
    if(cs)
185
0
        el->clockSource = *cs;
186
187
0
# ifdef CLOCK_MONOTONIC_RAW
188
0
    el->clockSourceMonotonic = CLOCK_MONOTONIC_RAW;
189
# else
190
    el->clockSourceMonotonic = CLOCK_MONOTONIC;
191
# endif
192
0
    if(csm)
193
0
        el->clockSourceMonotonic = *csm;
194
#else
195
    if(cs || csm) {
196
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
197
                       "Eventloop\t| Cannot set a custom clock source");
198
    }
199
#endif
200
201
    /* Create the self-pipe */
202
0
    int err = UA_EventLoopPOSIX_pipe(el->selfpipe);
203
0
    if(err != 0) {
204
0
        UA_LOG_SOCKET_ERRNO_WRAP(
205
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
206
0
                          "Eventloop\t| Could not create the self-pipe (%s)",
207
0
                          errno_str));
208
0
        UA_UNLOCK(&el->elMutex);
209
0
        return UA_STATUSCODE_BADINTERNALERROR;
210
0
    }
211
212
    /* Create the epoll socket */
213
0
#ifdef UA_HAVE_EPOLL
214
0
    el->epollfd = epoll_create1(0);
215
0
    if(el->epollfd == -1) {
216
0
        UA_LOG_SOCKET_ERRNO_WRAP(
217
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
218
0
                          "Eventloop\t| Could not create the epoll socket (%s)",
219
0
                          errno_str));
220
0
        UA_close(el->selfpipe[0]);
221
0
        UA_close(el->selfpipe[1]);
222
0
        UA_UNLOCK(&el->elMutex);
223
0
        return UA_STATUSCODE_BADINTERNALERROR;
224
0
    }
225
226
    /* epoll always listens on the self-pipe. This is the only epoll_event that
227
     * has a NULL data pointer. */
228
0
    struct epoll_event event;
229
0
    memset(&event, 0, sizeof(struct epoll_event));
230
0
    event.events = EPOLLIN;
231
0
    err = epoll_ctl(el->epollfd, EPOLL_CTL_ADD, el->selfpipe[0], &event);
232
0
    if(err != 0) {
233
0
        UA_LOG_SOCKET_ERRNO_WRAP(
234
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
235
0
                          "Eventloop\t| Could not register the self-pipe for epoll (%s)",
236
0
                          errno_str));
237
0
        UA_close(el->selfpipe[0]);
238
0
        UA_close(el->selfpipe[1]);
239
0
        close(el->epollfd);
240
0
        UA_UNLOCK(&el->elMutex);
241
0
        return UA_STATUSCODE_BADINTERNALERROR;
242
0
    }
243
0
#endif
244
245
    /* Start the EventSources */
246
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
247
0
    UA_EventSource *es = el->eventLoop.eventSources;
248
0
    while(es) {
249
0
        res |= es->start(es);
250
0
        es = es->next;
251
0
    }
252
253
    /* Dirty-write the state that is const "from the outside" */
254
0
    *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state =
255
0
        UA_EVENTLOOPSTATE_STARTED;
256
257
0
    UA_UNLOCK(&el->elMutex);
258
0
    return res;
259
0
}
260
261
static void
262
0
checkClosed(UA_EventLoopPOSIX *el) {
263
0
    UA_LOCK_ASSERT(&el->elMutex);
264
265
0
    UA_EventSource *es = el->eventLoop.eventSources;
266
0
    while(es) {
267
0
        if(es->state != UA_EVENTSOURCESTATE_STOPPED)
268
0
            return;
269
0
        es = es->next;
270
0
    }
271
272
    /* Not closed until all delayed callbacks are processed */
273
0
    if(el->delayedHead1 != NULL && el->delayedHead2 != NULL)
274
0
        return;
275
276
    /* Close the self-pipe when everything else is done */
277
0
    UA_close(el->selfpipe[0]);
278
0
    UA_close(el->selfpipe[1]);
279
280
    /* Dirty-write the state that is const "from the outside" */
281
0
    *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state =
282
0
        UA_EVENTLOOPSTATE_STOPPED;
283
284
    /* Close the epoll/IOCP socket once all EventSources have shut down */
285
0
#ifdef UA_HAVE_EPOLL
286
0
    UA_close(el->epollfd);
287
0
#endif
288
289
0
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
290
0
                 "The EventLoop has stopped");
291
0
}
292
293
static void
294
0
UA_EventLoopPOSIX_stop(UA_EventLoopPOSIX *el) {
295
0
    UA_LOCK(&el->elMutex);
296
297
0
    if(el->eventLoop.state != UA_EVENTLOOPSTATE_STARTED) {
298
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
299
0
                       "The EventLoop is not running, cannot be stopped");
300
0
        UA_UNLOCK(&el->elMutex);
301
0
        return;
302
0
    }
303
304
0
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
305
0
                 "Stopping the EventLoop");
306
307
    /* Set to STOPPING to prevent "normal use" */
308
0
    *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state =
309
0
        UA_EVENTLOOPSTATE_STOPPING;
310
311
    /* Stop all event sources (asynchronous) */
312
0
    UA_EventSource *es = el->eventLoop.eventSources;
313
0
    for(; es; es = es->next) {
314
0
        if(es->state == UA_EVENTSOURCESTATE_STARTING ||
315
0
           es->state == UA_EVENTSOURCESTATE_STARTED) {
316
0
            es->stop(es);
317
0
        }
318
0
    }
319
320
    /* Set to STOPPED if all EventSources are STOPPED */
321
0
    checkClosed(el);
322
323
0
    UA_UNLOCK(&el->elMutex);
324
0
}
325
326
static UA_StatusCode
327
0
UA_EventLoopPOSIX_run(UA_EventLoopPOSIX *el, UA_UInt32 timeout) {
328
0
    UA_LOCK(&el->elMutex);
329
330
0
    if(el->executing) {
331
0
        UA_LOG_ERROR(el->eventLoop.logger,
332
0
                     UA_LOGCATEGORY_EVENTLOOP,
333
0
                     "Cannot run EventLoop from the run method itself");
334
0
        UA_UNLOCK(&el->elMutex);
335
0
        return UA_STATUSCODE_BADINTERNALERROR;
336
0
    }
337
338
0
    el->executing = true;
339
340
0
    if(el->eventLoop.state == UA_EVENTLOOPSTATE_FRESH ||
341
0
       el->eventLoop.state == UA_EVENTLOOPSTATE_STOPPED) {
342
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
343
0
                       "Cannot run a stopped EventLoop");
344
0
        el->executing = false;
345
0
        UA_UNLOCK(&el->elMutex);
346
0
        return UA_STATUSCODE_BADINTERNALERROR;
347
0
    }
348
349
0
    UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
350
0
                 "Iterate the EventLoop");
351
352
    /* Process cyclic callbacks */
353
0
    UA_DateTime dateBefore =
354
0
        el->eventLoop.dateTime_nowMonotonic(&el->eventLoop);
355
356
0
    UA_DateTime dateNext = UA_Timer_process(&el->timer, dateBefore);
357
358
    /* Process delayed callbacks here:
359
     * - Removes closed sockets already here instead of polling them again.
360
     * - The timeout for polling is selected to be ready in time for the next
361
     *   cyclic callback. So we want to do little work between the timeout
362
     *   running out and executing the due cyclic callbacks. */
363
0
    processDelayed(el);
364
365
    /* A delayed callback could create another delayed callback (or re-add
366
     * itself). In that case we don't want to wait (indefinitely) for an event
367
     * to happen. Process queued events but don't sleep. Then process the
368
     * delayed callbacks in the next iteration. */
369
0
    if(el->delayedHead1 != NULL && el->delayedHead2 != NULL)
370
0
        timeout = 0;
371
372
    /* Compute the remaining time */
373
0
    UA_DateTime maxDate = dateBefore + (timeout * UA_DATETIME_MSEC);
374
0
    if(dateNext > maxDate)
375
0
        dateNext = maxDate;
376
0
    UA_DateTime listenTimeout =
377
0
        dateNext - el->eventLoop.dateTime_nowMonotonic(&el->eventLoop);
378
0
    if(listenTimeout < 0)
379
0
        listenTimeout = 0;
380
381
    /* Listen on the active file-descriptors (sockets) from the
382
     * ConnectionManagers */
383
0
    UA_StatusCode rv = UA_EventLoopPOSIX_pollFDs(el, listenTimeout);
384
385
    /* Check if the last EventSource was successfully stopped */
386
0
    if(el->eventLoop.state == UA_EVENTLOOPSTATE_STOPPING)
387
0
        checkClosed(el);
388
389
0
    el->executing = false;
390
0
    UA_UNLOCK(&el->elMutex);
391
0
    return rv;
392
0
}
393
394
/*****************************/
395
/* Registering Event Sources */
396
/*****************************/
397
398
static UA_StatusCode
399
UA_EventLoopPOSIX_registerEventSource(UA_EventLoopPOSIX *el,
400
0
                                      UA_EventSource *es) {
401
0
    UA_LOCK(&el->elMutex);
402
403
    /* Already registered? */
404
0
    if(es->state != UA_EVENTSOURCESTATE_FRESH) {
405
0
        UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
406
0
                     "Cannot register the EventSource \"%.*s\": "
407
0
                     "already registered",
408
0
                     (int)es->name.length, (char*)es->name.data);
409
0
        UA_UNLOCK(&el->elMutex);
410
0
        return UA_STATUSCODE_BADINTERNALERROR;
411
0
    }
412
413
    /* Add to linked list */
414
0
    es->next = el->eventLoop.eventSources;
415
0
    el->eventLoop.eventSources = es;
416
417
0
    es->eventLoop = &el->eventLoop;
418
0
    es->state = UA_EVENTSOURCESTATE_STOPPED;
419
420
    /* Start if the entire EventLoop is started */
421
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
422
0
    if(el->eventLoop.state == UA_EVENTLOOPSTATE_STARTED)
423
0
        res = es->start(es);
424
425
0
    UA_UNLOCK(&el->elMutex);
426
0
    return res;
427
0
}
428
429
static UA_StatusCode
430
UA_EventLoopPOSIX_deregisterEventSource(UA_EventLoopPOSIX *el,
431
0
                                        UA_EventSource *es) {
432
0
    UA_LOCK(&el->elMutex);
433
434
0
    if(es->state != UA_EVENTSOURCESTATE_STOPPED) {
435
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
436
0
                       "Cannot deregister the EventSource %.*s: "
437
0
                       "Has to be stopped first",
438
0
                       (int)es->name.length, es->name.data);
439
0
        UA_UNLOCK(&el->elMutex);
440
0
        return UA_STATUSCODE_BADINTERNALERROR;
441
0
    }
442
443
    /* Remove from the linked list */
444
0
    UA_EventSource **s = &el->eventLoop.eventSources;
445
0
    while(*s) {
446
0
        if(*s == es) {
447
0
            *s = es->next;
448
0
            break;
449
0
        }
450
0
        s = &(*s)->next;
451
0
    }
452
453
    /* Set the state to non-registered */
454
0
    es->state = UA_EVENTSOURCESTATE_FRESH;
455
456
0
    UA_UNLOCK(&el->elMutex);
457
0
    return UA_STATUSCODE_GOOD;
458
0
}
459
460
/***************/
461
/* Time Domain */
462
/***************/
463
464
static UA_DateTime
465
0
UA_EventLoopPOSIX_DateTime_now(UA_EventLoop *el) {
466
0
#if defined(UA_ARCHITECTURE_POSIX) && !defined(__APPLE__) && !defined(__MACH__)
467
0
    UA_EventLoopPOSIX *pel = (UA_EventLoopPOSIX*)el;
468
0
    struct timespec ts;
469
0
    int res = clock_gettime(pel->clockSource, &ts);
470
0
    if(UA_UNLIKELY(res != 0))
471
0
        return 0;
472
0
    return (ts.tv_sec * UA_DATETIME_SEC) + (ts.tv_nsec / 100) + UA_DATETIME_UNIX_EPOCH;
473
#else
474
    return UA_DateTime_now();
475
#endif
476
0
}
477
478
static UA_DateTime
479
0
UA_EventLoopPOSIX_DateTime_nowMonotonic(UA_EventLoop *el) {
480
0
#if defined(UA_ARCHITECTURE_POSIX) && !defined(__APPLE__) && !defined(__MACH__)
481
0
    UA_EventLoopPOSIX *pel = (UA_EventLoopPOSIX*)el;
482
0
    struct timespec ts;
483
0
    int res = clock_gettime(pel->clockSourceMonotonic, &ts);
484
0
    if(UA_UNLIKELY(res != 0))
485
0
        return 0;
486
    /* Also add the unix epoch for the monotonic clock. So we get a "normal"
487
     * output when a "normal" source is configured. */
488
0
    return (ts.tv_sec * UA_DATETIME_SEC) + (ts.tv_nsec / 100) + UA_DATETIME_UNIX_EPOCH;
489
#else
490
    return UA_DateTime_nowMonotonic();
491
#endif
492
0
}
493
494
static UA_Int64
495
0
UA_EventLoopPOSIX_DateTime_localTimeUtcOffset(UA_EventLoop *el) {
496
    /* TODO: Fix for custom clock sources */
497
0
    return UA_DateTime_localTimeUtcOffset();
498
0
}
499
500
/*************************/
501
/* Initialize and Delete */
502
/*************************/
503
504
static UA_StatusCode
505
0
UA_EventLoopPOSIX_free(UA_EventLoopPOSIX *el) {
506
0
    UA_LOCK(&el->elMutex);
507
508
    /* Check if the EventLoop can be deleted */
509
0
    if(el->eventLoop.state != UA_EVENTLOOPSTATE_STOPPED &&
510
0
       el->eventLoop.state != UA_EVENTLOOPSTATE_FRESH) {
511
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
512
0
                       "Cannot delete a running EventLoop");
513
0
        UA_UNLOCK(&el->elMutex);
514
0
        return UA_STATUSCODE_BADINTERNALERROR;
515
0
    }
516
517
    /* Deregister and delete all the EventSources */
518
0
    while(el->eventLoop.eventSources) {
519
0
        UA_EventSource *es = el->eventLoop.eventSources;
520
0
        UA_EventLoopPOSIX_deregisterEventSource(el, es);
521
0
        es->free(es);
522
0
    }
523
524
    /* Remove the repeated timed callbacks */
525
0
    UA_Timer_clear(&el->timer);
526
527
    /* Process remaining delayed callbacks */
528
0
    processDelayed(el);
529
530
#ifdef UA_ARCHITECTURE_WIN32
531
    /* Stop the Windows networking subsystem */
532
    WSACleanup();
533
#endif
534
535
0
    UA_KeyValueMap_clear(&el->eventLoop.params);
536
537
    /* Clean up */
538
0
    UA_UNLOCK(&el->elMutex);
539
0
    UA_LOCK_DESTROY(&el->elMutex);
540
0
    UA_free(el);
541
0
    return UA_STATUSCODE_GOOD;
542
0
}
543
544
static void
545
0
UA_EventLoopPOSIX_lock(UA_EventLoop *public_el) {
546
0
    UA_LOCK(&((UA_EventLoopPOSIX*)public_el)->elMutex);
547
0
}
548
static void
549
0
UA_EventLoopPOSIX_unlock(UA_EventLoop *public_el) {
550
0
    UA_UNLOCK(&((UA_EventLoopPOSIX*)public_el)->elMutex);
551
0
}
552
553
UA_EventLoop *
554
0
UA_EventLoop_new_POSIX(const UA_Logger *logger) {
555
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)
556
0
        UA_calloc(1, sizeof(UA_EventLoopPOSIX));
557
0
    if(!el)
558
0
        return NULL;
559
560
0
    UA_LOCK_INIT(&el->elMutex);
561
0
    UA_Timer_init(&el->timer);
562
563
    /* Initialize the queue */
564
0
    el->delayedTail = &el->delayedHead1;
565
0
    el->delayedHead2 = (UA_DelayedCallback*)0x01; /* sentinel value */
566
567
#ifdef UA_ARCHITECTURE_WIN32
568
    /* Start the WSA networking subsystem on Windows */
569
    WSADATA wsaData;
570
    WSAStartup(MAKEWORD(2, 2), &wsaData);
571
#endif
572
573
    /* Set the public EventLoop content */
574
0
    el->eventLoop.logger = logger;
575
576
0
    el->eventLoop.start = (UA_StatusCode (*)(UA_EventLoop*))UA_EventLoopPOSIX_start;
577
0
    el->eventLoop.stop = (void (*)(UA_EventLoop*))UA_EventLoopPOSIX_stop;
578
0
    el->eventLoop.free = (UA_StatusCode (*)(UA_EventLoop*))UA_EventLoopPOSIX_free;
579
0
    el->eventLoop.run = (UA_StatusCode (*)(UA_EventLoop*, UA_UInt32))UA_EventLoopPOSIX_run;
580
0
    el->eventLoop.cancel = (void (*)(UA_EventLoop*))UA_EventLoopPOSIX_cancel;
581
582
0
    el->eventLoop.dateTime_now = UA_EventLoopPOSIX_DateTime_now;
583
0
    el->eventLoop.dateTime_nowMonotonic =
584
0
        UA_EventLoopPOSIX_DateTime_nowMonotonic;
585
0
    el->eventLoop.dateTime_localTimeUtcOffset =
586
0
        UA_EventLoopPOSIX_DateTime_localTimeUtcOffset;
587
588
0
    el->eventLoop.nextTimer = UA_EventLoopPOSIX_nextTimer;
589
0
    el->eventLoop.addTimer = UA_EventLoopPOSIX_addTimer;
590
0
    el->eventLoop.modifyTimer = UA_EventLoopPOSIX_modifyTimer;
591
0
    el->eventLoop.removeTimer = UA_EventLoopPOSIX_removeTimer;
592
0
    el->eventLoop.addDelayedCallback = UA_EventLoopPOSIX_addDelayedCallback;
593
0
    el->eventLoop.removeDelayedCallback = UA_EventLoopPOSIX_removeDelayedCallback;
594
595
0
    el->eventLoop.registerEventSource =
596
0
        (UA_StatusCode (*)(UA_EventLoop*, UA_EventSource*))
597
0
        UA_EventLoopPOSIX_registerEventSource;
598
0
    el->eventLoop.deregisterEventSource =
599
0
        (UA_StatusCode (*)(UA_EventLoop*, UA_EventSource*))
600
0
        UA_EventLoopPOSIX_deregisterEventSource;
601
602
0
    el->eventLoop.lock = UA_EventLoopPOSIX_lock;
603
0
    el->eventLoop.unlock = UA_EventLoopPOSIX_unlock;
604
605
0
    return &el->eventLoop;
606
0
}
607
608
/***************************/
609
/* Network Buffer Handling */
610
/***************************/
611
612
UA_StatusCode
613
UA_EventLoopPOSIX_allocNetworkBuffer(UA_ConnectionManager *cm,
614
                                     uintptr_t connectionId,
615
                                     UA_ByteString *buf,
616
0
                                     size_t bufSize) {
617
0
    UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm;
618
0
    if(pcm->txBuffer.length == 0)
619
0
        return UA_ByteString_allocBuffer(buf, bufSize);
620
0
    if(pcm->txBuffer.length < bufSize)
621
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
622
0
    *buf = pcm->txBuffer;
623
0
    buf->length = bufSize;
624
0
    return UA_STATUSCODE_GOOD;
625
0
}
626
627
void
628
UA_EventLoopPOSIX_freeNetworkBuffer(UA_ConnectionManager *cm,
629
                                    uintptr_t connectionId,
630
0
                                    UA_ByteString *buf) {
631
0
    UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm;
632
0
    if(pcm->txBuffer.data == buf->data)
633
0
        UA_ByteString_init(buf);
634
0
    else
635
0
        UA_ByteString_clear(buf);
636
0
}
637
638
UA_StatusCode
639
0
UA_EventLoopPOSIX_allocateStaticBuffers(UA_POSIXConnectionManager *pcm) {
640
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
641
0
    UA_UInt32 rxBufSize = 2u << 16; /* The default is 64kb */
642
0
    const UA_UInt32 *configRxBufSize = (const UA_UInt32 *)
643
0
        UA_KeyValueMap_getScalar(&pcm->cm.eventSource.params,
644
0
                                 UA_QUALIFIEDNAME(0, "recv-bufsize"),
645
0
                                 &UA_TYPES[UA_TYPES_UINT32]);
646
0
    if(configRxBufSize)
647
0
        rxBufSize = *configRxBufSize;
648
0
    if(pcm->rxBuffer.length != rxBufSize) {
649
0
        UA_ByteString_clear(&pcm->rxBuffer);
650
0
        res = UA_ByteString_allocBuffer(&pcm->rxBuffer, rxBufSize);
651
0
    }
652
653
0
    const UA_UInt32 *txBufSize = (const UA_UInt32 *)
654
0
        UA_KeyValueMap_getScalar(&pcm->cm.eventSource.params,
655
0
                                 UA_QUALIFIEDNAME(0, "send-bufsize"),
656
0
                                 &UA_TYPES[UA_TYPES_UINT32]);
657
0
    if(txBufSize && pcm->txBuffer.length != *txBufSize) {
658
0
        UA_ByteString_clear(&pcm->txBuffer);
659
0
        res |= UA_ByteString_allocBuffer(&pcm->txBuffer, *txBufSize);
660
0
    }
661
0
    return res;
662
0
}
663
664
/******************/
665
/* Socket Options */
666
/******************/
667
668
enum ZIP_CMP
669
0
cmpFD(const UA_FD *a, const UA_FD *b) {
670
0
    if(*a == *b)
671
0
        return ZIP_CMP_EQ;
672
0
    return (*a < *b) ? ZIP_CMP_LESS : ZIP_CMP_MORE;
673
0
}
674
675
UA_StatusCode
676
0
UA_EventLoopPOSIX_setNonBlocking(UA_FD sockfd) {
677
0
#ifndef UA_ARCHITECTURE_WIN32
678
0
    int opts = fcntl(sockfd, F_GETFL);
679
0
    if(opts < 0 || fcntl(sockfd, F_SETFL, opts | O_NONBLOCK) < 0)
680
0
        return UA_STATUSCODE_BADINTERNALERROR;
681
#else
682
    u_long iMode = 1;
683
    if(ioctlsocket(sockfd, FIONBIO, &iMode) != NO_ERROR)
684
        return UA_STATUSCODE_BADINTERNALERROR;
685
#endif
686
0
    return UA_STATUSCODE_GOOD;
687
0
}
688
689
UA_StatusCode
690
0
UA_EventLoopPOSIX_setNoSigPipe(UA_FD sockfd) {
691
#ifdef SO_NOSIGPIPE
692
    int val = 1;
693
    int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val));
694
    if(res < 0)
695
        return UA_STATUSCODE_BADINTERNALERROR;
696
#endif
697
0
    return UA_STATUSCODE_GOOD;
698
0
}
699
700
UA_StatusCode
701
0
UA_EventLoopPOSIX_setReusable(UA_FD sockfd) {
702
0
    int enableReuseVal = 1;
703
0
#ifndef UA_ARCHITECTURE_WIN32
704
0
    int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
705
0
                            (const char*)&enableReuseVal, sizeof(enableReuseVal));
706
0
    res |= UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT,
707
0
                            (const char*)&enableReuseVal, sizeof(enableReuseVal));
708
0
    return (res == 0) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADINTERNALERROR;
709
#else
710
    int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
711
                            (const char*)&enableReuseVal, sizeof(enableReuseVal));
712
    return (res == 0) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADINTERNALERROR;
713
#endif
714
0
}
715
716
/************************/
717
/* Select / epoll Logic */
718
/************************/
719
720
/* Re-arm the self-pipe socket for the next signal by reading from it */
721
static void
722
0
flushSelfPipe(UA_SOCKET s) {
723
0
    char buf[128];
724
#ifdef UA_ARCHITECTURE_WIN32
725
    recv(s, buf, 128, 0);
726
#else
727
0
    ssize_t i;
728
0
    do {
729
0
        i = read(s, buf, 128);
730
0
    } while(i > 0);
731
0
#endif
732
0
}
733
734
#if !defined(UA_HAVE_EPOLL)
735
736
UA_StatusCode
737
UA_EventLoopPOSIX_registerFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
738
    UA_LOCK_ASSERT(&el->elMutex);
739
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
740
                 "Registering fd: %u", (unsigned)rfd->fd);
741
742
    /* Realloc */
743
    UA_RegisteredFD **fds_tmp = (UA_RegisteredFD**)
744
        UA_realloc(el->fds, sizeof(UA_RegisteredFD*) * (el->fdsSize + 1));
745
    if(!fds_tmp) {
746
        return UA_STATUSCODE_BADOUTOFMEMORY;
747
    }
748
    el->fds = fds_tmp;
749
750
    /* Add to the last entry */
751
    el->fds[el->fdsSize] = rfd;
752
    el->fdsSize++;
753
    return UA_STATUSCODE_GOOD;
754
}
755
756
UA_StatusCode
757
UA_EventLoopPOSIX_modifyFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
758
    /* Do nothing, it is enough if the data was changed in the rfd */
759
    UA_LOCK_ASSERT(&el->elMutex);
760
    return UA_STATUSCODE_GOOD;
761
}
762
763
void
764
UA_EventLoopPOSIX_deregisterFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
765
    UA_LOCK_ASSERT(&el->elMutex);
766
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
767
                 "Unregistering fd: %u", (unsigned)rfd->fd);
768
769
    /* Find the entry */
770
    size_t i = 0;
771
    for(; i < el->fdsSize; i++) {
772
        if(el->fds[i] == rfd)
773
            break;
774
    }
775
776
    /* Not found? */
777
    if(i == el->fdsSize)
778
        return;
779
780
    if(el->fdsSize > 1) {
781
        /* Move the last entry in the ith slot and realloc. */
782
        el->fdsSize--;
783
        el->fds[i] = el->fds[el->fdsSize];
784
        UA_RegisteredFD **fds_tmp = (UA_RegisteredFD**)
785
            UA_realloc(el->fds, sizeof(UA_RegisteredFD*) * el->fdsSize);
786
        /* if realloc fails the fds are still in a correct state with
787
         * possibly lost memory, so failing silently here is ok */
788
        if(fds_tmp)
789
            el->fds = fds_tmp;
790
    } else {
791
        /* Remove the last entry */
792
        UA_free(el->fds);
793
        el->fds = NULL;
794
        el->fdsSize = 0;
795
    }
796
}
797
798
static UA_FD
799
setFDSets(UA_EventLoopPOSIX *el, fd_set *readset, fd_set *writeset, fd_set *errset) {
800
    UA_LOCK_ASSERT(&el->elMutex);
801
802
    FD_ZERO(readset);
803
    FD_ZERO(writeset);
804
    FD_ZERO(errset);
805
806
    /* Always listen on the read-end of the pipe */
807
    UA_FD highestfd = el->selfpipe[0];
808
    FD_SET(el->selfpipe[0], readset);
809
810
    for(size_t i = 0; i < el->fdsSize; i++) {
811
        UA_FD currentFD = el->fds[i]->fd;
812
813
        /* Add to the fd_sets */
814
        if(el->fds[i]->listenEvents & UA_FDEVENT_IN)
815
            FD_SET(currentFD, readset);
816
        if(el->fds[i]->listenEvents & UA_FDEVENT_OUT)
817
            FD_SET(currentFD, writeset);
818
819
        /* Always return errors */
820
        FD_SET(currentFD, errset);
821
822
        /* Highest fd? */
823
        if(currentFD > highestfd)
824
            highestfd = currentFD;
825
    }
826
    return highestfd;
827
}
828
829
UA_StatusCode
830
UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
831
    UA_assert(listenTimeout >= 0);
832
    UA_LOCK_ASSERT(&el->elMutex);
833
834
    fd_set readset, writeset, errset;
835
    UA_FD highestfd = setFDSets(el, &readset, &writeset, &errset);
836
837
    /* Nothing to do? */
838
    if(highestfd == UA_INVALID_FD) {
839
        UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
840
                     "No valid FDs for processing");
841
        return UA_STATUSCODE_GOOD;
842
    }
843
844
    struct timeval tmptv = {
845
#ifndef UA_ARCHITECTURE_WIN32
846
        (time_t)(listenTimeout / UA_DATETIME_SEC),
847
        (suseconds_t)((listenTimeout % UA_DATETIME_SEC) / UA_DATETIME_USEC)
848
#else
849
        (long)(listenTimeout / UA_DATETIME_SEC),
850
        (long)((listenTimeout % UA_DATETIME_SEC) / UA_DATETIME_USEC)
851
#endif
852
    };
853
854
    UA_UNLOCK(&el->elMutex);
855
    int selectStatus = UA_select(highestfd+1, &readset, &writeset, &errset, &tmptv);
856
    UA_LOCK(&el->elMutex);
857
    if(selectStatus < 0) {
858
        /* We will retry, only log the error */
859
        UA_LOG_SOCKET_ERRNO_WRAP(
860
            UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
861
                           "Error during select: %s", errno_str));
862
        return UA_STATUSCODE_GOOD;
863
    }
864
865
    /* The self-pipe has received. Clear the buffer by reading. */
866
    if(UA_UNLIKELY(FD_ISSET(el->selfpipe[0], &readset)))
867
        flushSelfPipe(el->selfpipe[0]);
868
869
    /* Loop over all registered FD to see if an event arrived. Yes, this is why
870
     * select is slow for many open sockets. */
871
    for(size_t i = 0; i < el->fdsSize; i++) {
872
        UA_RegisteredFD *rfd = el->fds[i];
873
874
        /* The rfd is already registered for removal. Don't process incoming
875
         * events any longer. */
876
        if(rfd->dc.callback)
877
            continue;
878
879
        /* Event signaled for the fd? */
880
        short event = 0;
881
        if(FD_ISSET(rfd->fd, &readset)) {
882
            event = UA_FDEVENT_IN;
883
        } else if(FD_ISSET(rfd->fd, &writeset)) {
884
            event = UA_FDEVENT_OUT;
885
        } else if(FD_ISSET(rfd->fd, &errset)) {
886
            event = UA_FDEVENT_ERR;
887
        } else {
888
            continue;
889
        }
890
891
        UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
892
                     "Processing event %u on fd %u", (unsigned)event,
893
                     (unsigned)rfd->fd);
894
895
        /* Call the EventSource callback */
896
        rfd->eventSourceCB(rfd->es, rfd, event);
897
898
        /* The fd has removed itself */
899
        if(i == el->fdsSize || rfd != el->fds[i])
900
            i--;
901
    }
902
    return UA_STATUSCODE_GOOD;
903
}
904
905
#else /* defined(UA_HAVE_EPOLL) */
906
907
UA_StatusCode
908
0
UA_EventLoopPOSIX_registerFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
909
0
    struct epoll_event event;
910
0
    memset(&event, 0, sizeof(struct epoll_event));
911
0
    event.data.ptr = rfd;
912
0
    event.events = 0;
913
0
    if(rfd->listenEvents & UA_FDEVENT_IN)
914
0
        event.events |= EPOLLIN;
915
0
    if(rfd->listenEvents & UA_FDEVENT_OUT)
916
0
        event.events |= EPOLLOUT;
917
918
0
    int err = epoll_ctl(el->epollfd, EPOLL_CTL_ADD, rfd->fd, &event);
919
0
    if(err != 0) {
920
0
        UA_LOG_SOCKET_ERRNO_WRAP(
921
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
922
0
                          "TCP %u\t| Could not register for epoll (%s)",
923
0
                          rfd->fd, errno_str));
924
0
        return UA_STATUSCODE_BADINTERNALERROR;
925
0
    }
926
0
    return UA_STATUSCODE_GOOD;
927
0
}
928
929
UA_StatusCode
930
0
UA_EventLoopPOSIX_modifyFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
931
0
    struct epoll_event event;
932
0
    memset(&event, 0, sizeof(struct epoll_event));
933
0
    event.data.ptr = rfd;
934
0
    event.events = 0;
935
0
    if(rfd->listenEvents & UA_FDEVENT_IN)
936
0
        event.events |= EPOLLIN;
937
0
    if(rfd->listenEvents & UA_FDEVENT_OUT)
938
0
        event.events |= EPOLLOUT;
939
940
0
    int err = epoll_ctl(el->epollfd, EPOLL_CTL_MOD, rfd->fd, &event);
941
0
    if(err != 0) {
942
0
        UA_LOG_SOCKET_ERRNO_WRAP(
943
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
944
0
                          "TCP %u\t| Could not modify for epoll (%s)",
945
0
                          rfd->fd, errno_str));
946
0
        return UA_STATUSCODE_BADINTERNALERROR;
947
0
    }
948
0
    return UA_STATUSCODE_GOOD;
949
0
}
950
951
void
952
0
UA_EventLoopPOSIX_deregisterFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
953
0
    int res = epoll_ctl(el->epollfd, EPOLL_CTL_DEL, rfd->fd, NULL);
954
0
    if(res != 0) {
955
0
        UA_LOG_SOCKET_ERRNO_WRAP(
956
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
957
0
                          "TCP %u\t| Could not deregister from epoll (%s)",
958
0
                          rfd->fd, errno_str));
959
0
    }
960
0
}
961
962
UA_StatusCode
963
0
UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
964
0
    UA_assert(listenTimeout >= 0);
965
966
    /* If there is a positive timeout, wait at least one millisecond, the
967
     * minimum for blocking epoll_wait. This prevents a busy-loop, as the
968
     * open62541 library allows even smaller timeouts, which can result in a
969
     * zero timeout due to rounding to an integer here. */
970
0
    int timeout = (int)(listenTimeout / UA_DATETIME_MSEC);
971
0
    if(timeout == 0 && listenTimeout > 0)
972
0
        timeout = 1;
973
974
    /* Poll the registered sockets */
975
0
    struct epoll_event epoll_events[64];
976
0
    UA_UNLOCK(&el->elMutex);
977
0
    int events = epoll_wait(el->epollfd, epoll_events, 64, timeout);
978
0
    UA_LOCK(&el->elMutex);
979
980
    /* TODO: Replace with pwait2 for higher-precision timeouts once this is
981
     * available in the standard library.
982
     *
983
     * struct timespec precisionTimeout = {
984
     *  (long)(listenTimeout / UA_DATETIME_SEC),
985
     *   (long)((listenTimeout % UA_DATETIME_SEC) * 100)
986
     * };
987
     * int events = epoll_pwait2(epollfd, epoll_events, 64,
988
     *                        precisionTimeout, NULL); */
989
990
    /* Handle error conditions */
991
0
    if(events == -1) {
992
0
        if(errno == EINTR) {
993
            /* We will retry, only log the error */
994
0
            UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
995
0
                         "Timeout during poll");
996
0
            return UA_STATUSCODE_GOOD;
997
0
        }
998
0
        UA_LOG_SOCKET_ERRNO_WRAP(
999
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
1000
0
                          "TCP\t| Error %s, closing the server socket",
1001
0
                          errno_str));
1002
0
        return UA_STATUSCODE_BADINTERNALERROR;
1003
0
    }
1004
1005
    /* Process all received events */
1006
0
    for(int i = 0; i < events; i++) {
1007
0
        UA_RegisteredFD *rfd = (UA_RegisteredFD*)epoll_events[i].data.ptr;
1008
1009
        /* The self-pipe has received */
1010
0
        if(!rfd) {
1011
0
            flushSelfPipe(el->selfpipe[0]);
1012
0
            continue;
1013
0
        }
1014
1015
        /* The rfd is already registered for removal. Don't process incoming
1016
         * events any longer. */
1017
0
        if(rfd->dc.callback)
1018
0
            continue;
1019
1020
        /* Get the event */
1021
0
        short revent = 0;
1022
0
        if((epoll_events[i].events & EPOLLIN) == EPOLLIN) {
1023
0
            revent = UA_FDEVENT_IN;
1024
0
        } else if((epoll_events[i].events & EPOLLOUT) == EPOLLOUT) {
1025
0
            revent = UA_FDEVENT_OUT;
1026
0
        } else {
1027
0
            revent = UA_FDEVENT_ERR;
1028
0
        }
1029
1030
        /* Call the EventSource callback */
1031
0
        rfd->eventSourceCB(rfd->es, rfd, revent);
1032
0
    }
1033
0
    return UA_STATUSCODE_GOOD;
1034
0
}
1035
1036
#endif /* defined(UA_HAVE_EPOLL) */
1037
1038
#if defined(UA_ARCHITECTURE_WIN32) || defined(__APPLE__)
1039
int UA_EventLoopPOSIX_pipe(SOCKET fds[2]) {
1040
    struct sockaddr_in inaddr;
1041
    memset(&inaddr, 0, sizeof(inaddr));
1042
    inaddr.sin_family = AF_INET;
1043
    inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1044
    inaddr.sin_port = 0;
1045
1046
    SOCKET lst = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1047
    bind(lst, (struct sockaddr *)&inaddr, sizeof(inaddr));
1048
    listen(lst, 1);
1049
1050
    struct sockaddr_storage addr;
1051
    memset(&addr, 0, sizeof(addr));
1052
    int len = sizeof(addr);
1053
    getsockname(lst, (struct sockaddr*)&addr, &len);
1054
1055
    fds[0] = socket(AF_INET, SOCK_STREAM, 0);
1056
    int err = connect(fds[0], (struct sockaddr*)&addr, len);
1057
    fds[1] = accept(lst, 0, 0);
1058
#ifdef UA_ARCHITECTURE_WIN32
1059
    closesocket(lst);
1060
#endif
1061
#ifdef __APPLE__
1062
    close(lst);
1063
#endif
1064
1065
    UA_EventLoopPOSIX_setNoSigPipe(fds[0]);
1066
    UA_EventLoopPOSIX_setReusable(fds[0]);
1067
    UA_EventLoopPOSIX_setNonBlocking(fds[0]);
1068
    UA_EventLoopPOSIX_setNoSigPipe(fds[1]);
1069
    UA_EventLoopPOSIX_setReusable(fds[1]);
1070
    UA_EventLoopPOSIX_setNonBlocking(fds[1]);
1071
    return err;
1072
}
1073
#elif defined(__QNX__)
1074
int UA_EventLoopPOSIX_pipe(int fds[2]) {
1075
    int err = pipe(fds); 
1076
    if(err == -1) {
1077
      return err;
1078
    }
1079
1080
    err = fcntl(fds[0], F_SETFL, O_NONBLOCK);
1081
    if(err == -1) {
1082
      return err;
1083
    }
1084
    return err;
1085
}
1086
#endif
1087
1088
void
1089
0
UA_EventLoopPOSIX_cancel(UA_EventLoopPOSIX *el) {
1090
    /* Nothing to do if the EventLoop is not executing */
1091
0
    if(!el->executing)
1092
0
        return;
1093
1094
    /* Trigger the self-pipe */
1095
#ifdef UA_ARCHITECTURE_WIN32
1096
    int err = send(el->selfpipe[1], ".", 1, 0);
1097
#else
1098
0
    ssize_t err = write(el->selfpipe[1], ".", 1);
1099
0
#endif
1100
0
    if(err <= 0) {
1101
0
        UA_LOG_SOCKET_ERRNO_WRAP(
1102
0
            UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
1103
0
                           "Eventloop\t| Error signaling self-pipe (%s)", errno_str));
1104
0
    }
1105
0
}
1106
1107
#endif