Coverage Report

Created: 2026-03-31 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/open62541/arch/posix/eventloop_posix.c
Line
Count
Source
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
 *
5
 *    Copyright 2021 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
6
 *    Copyright 2021 (c) Fraunhofer IOSB (Author: Jan Hermes)
7
 */
8
9
#include "eventloop_posix.h"
10
#include "open62541/plugin/eventloop.h"
11
12
#if defined(UA_ARCHITECTURE_POSIX) && !defined(UA_ARCHITECTURE_LWIP) || defined(UA_ARCHITECTURE_WIN32)
13
14
/*********/
15
/* Timer */
16
/*********/
17
18
static UA_DateTime
19
0
UA_EventLoopPOSIX_nextTimer(UA_EventLoop *public_el) {
20
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
21
0
    if(el->delayedHead1 > (UA_DelayedCallback *)0x01 ||
22
0
       el->delayedHead2 > (UA_DelayedCallback *)0x01)
23
0
        return el->eventLoop.dateTime_nowMonotonic(&el->eventLoop);
24
0
    return UA_Timer_next(&el->timer);
25
0
}
26
27
static UA_StatusCode
28
UA_EventLoopPOSIX_addTimer(UA_EventLoop *public_el, UA_Callback cb,
29
                           void *application, void *data, UA_Double interval_ms,
30
                           UA_DateTime *baseTime, UA_TimerPolicy timerPolicy,
31
0
                           UA_UInt64 *callbackId) {
32
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
33
0
    return UA_Timer_add(&el->timer, cb, application, data, interval_ms,
34
0
                        public_el->dateTime_nowMonotonic(public_el),
35
0
                        baseTime, timerPolicy, callbackId);
36
0
}
37
38
static UA_StatusCode
39
UA_EventLoopPOSIX_modifyTimer(UA_EventLoop *public_el,
40
                              UA_UInt64 callbackId,
41
                              UA_Double interval_ms,
42
                              UA_DateTime *baseTime,
43
0
                              UA_TimerPolicy timerPolicy) {
44
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
45
0
    return UA_Timer_modify(&el->timer, callbackId, interval_ms,
46
0
                           public_el->dateTime_nowMonotonic(public_el),
47
0
                           baseTime, timerPolicy);
48
0
}
49
50
static void
51
UA_EventLoopPOSIX_removeTimer(UA_EventLoop *public_el,
52
0
                              UA_UInt64 callbackId) {
53
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
54
0
    UA_Timer_remove(&el->timer, callbackId);
55
0
}
56
57
void
58
UA_EventLoopPOSIX_addDelayedCallback(UA_EventLoop *public_el,
59
0
                                     UA_DelayedCallback *dc) {
60
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
61
0
    dc->next = NULL;
62
63
    /* el->delayedTail points either to prev->next or to the head.
64
     * We need to update two locations:
65
     * 1: el->delayedTail = &dc->next;
66
     * 2: *oldtail = dc; (equal to &dc->next)
67
     *
68
     * Once we have (1), we "own" the previous-to-last entry. No need to worry
69
     * about (2), we can adjust it with a delay. This makes the queue
70
     * "eventually consistent". */
71
0
    UA_DelayedCallback **oldtail = (UA_DelayedCallback**)
72
0
        UA_atomic_xchg((void**)&el->delayedTail, &dc->next);
73
0
    UA_atomic_xchg((void**)oldtail, &dc->next);
74
0
}
75
76
/* Resets the delayed queue and returns the previous head and tail */
77
static void
78
resetDelayedQueue(UA_EventLoopPOSIX *el, UA_DelayedCallback **oldHead,
79
0
                  UA_DelayedCallback **oldTail) {
80
0
    if(el->delayedHead1 <= (UA_DelayedCallback *)0x01 &&
81
0
       el->delayedHead2 <= (UA_DelayedCallback *)0x01)
82
0
        return; /* The queue is empty */
83
84
0
    UA_Boolean active1 = (el->delayedHead1 != (UA_DelayedCallback*)0x01);
85
0
    UA_DelayedCallback **activeHead = (active1) ? &el->delayedHead1 : &el->delayedHead2;
86
0
    UA_DelayedCallback **inactiveHead = (active1) ? &el->delayedHead2 : &el->delayedHead1;
87
88
    /* Switch active/inactive by resetting the sentinel values. The (old) active
89
     * head points to an element which we return. Parallel threads continue to
90
     * add elements to the queue "below" the first element. */
91
0
    UA_atomic_xchg((void**)inactiveHead, NULL);
92
0
    *oldHead = (UA_DelayedCallback *)
93
0
        UA_atomic_xchg((void**)activeHead, (void*)0x01);
94
95
    /* Make the tail point to the (new) active head. Return the value of last
96
     * tail. When iterating over the queue elements, we need to find this tail
97
     * as the last element. If we find a NULL next-pointer before hitting the
98
     * tail spinlock until the pointer updates (eventually consistent). */
99
0
    *oldTail = (UA_DelayedCallback*)
100
0
        UA_atomic_xchg((void**)&el->delayedTail, inactiveHead);
101
0
}
102
103
static void
104
UA_EventLoopPOSIX_removeDelayedCallback(UA_EventLoop *public_el,
105
0
                                        UA_DelayedCallback *dc) {
106
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
107
0
    UA_LOCK(&el->elMutex);
108
109
    /* Reset and get the old head and tail */
110
0
    UA_DelayedCallback *cur = NULL, *tail = NULL;
111
0
    resetDelayedQueue(el, &cur, &tail);
112
113
    /* Loop until we reach the tail (or head and tail are both NULL) */
114
0
    UA_DelayedCallback *next;
115
0
    for(; cur; cur = next) {
116
        /* Spin-loop until the next-pointer of cur is updated.
117
         * The element pointed to by tail must appear eventually. */
118
0
        next = cur->next;
119
0
        while(!next && cur != tail)
120
0
            next = (UA_DelayedCallback *)UA_atomic_load((void**)&cur->next);
121
0
        if(cur == dc)
122
0
            continue;
123
0
        UA_EventLoopPOSIX_addDelayedCallback(public_el, cur);
124
0
    }
125
126
0
    UA_UNLOCK(&el->elMutex);
127
0
}
128
129
static void
130
0
processDelayed(UA_EventLoopPOSIX *el) {
131
0
    UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
132
0
                 "Process delayed callbacks");
133
134
0
    UA_LOCK_ASSERT(&el->elMutex);
135
136
    /* Reset and get the old head and tail */
137
0
    UA_DelayedCallback *dc = NULL, *tail = NULL;
138
0
    resetDelayedQueue(el, &dc, &tail);
139
140
    /* Loop until we reach the tail (or head and tail are both NULL) */
141
0
    UA_DelayedCallback *next;
142
0
    for(; dc; dc = next) {
143
0
        next = dc->next;
144
0
        while(!next && dc != tail)
145
0
            next = (UA_DelayedCallback *)UA_atomic_load((void**)&dc->next);
146
0
        if(!dc->callback)
147
0
            continue;
148
0
        dc->callback(dc->application, dc->context);
149
0
    }
150
0
}
151
152
/***********************/
153
/* EventLoop Lifecycle */
154
/***********************/
155
156
static UA_StatusCode
157
0
UA_EventLoopPOSIX_start(UA_EventLoopPOSIX *el) {
158
0
    UA_LOCK(&el->elMutex);
159
160
0
    if(el->eventLoop.state != UA_EVENTLOOPSTATE_FRESH &&
161
0
       el->eventLoop.state != UA_EVENTLOOPSTATE_STOPPED) {
162
0
        UA_UNLOCK(&el->elMutex);
163
0
        return UA_STATUSCODE_BADINTERNALERROR;
164
0
    }
165
166
0
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
167
0
                 "Starting the EventLoop");
168
169
    /* Setting custom clock source */
170
0
    const UA_Int32 *cs = (const UA_Int32*)
171
0
        UA_KeyValueMap_getScalar(&el->eventLoop.params,
172
0
                                 UA_QUALIFIEDNAME(0, "clock-source"),
173
0
                                 &UA_TYPES[UA_TYPES_INT32]);
174
0
    if(cs)
175
0
        el->clockSource = *cs;
176
177
0
    const UA_Int32 *csm = (const UA_Int32*)
178
0
        UA_KeyValueMap_getScalar(&el->eventLoop.params,
179
0
                                 UA_QUALIFIEDNAME(0, "clock-source-monotonic"),
180
0
                                 &UA_TYPES[UA_TYPES_INT32]);
181
0
    if(csm) {
182
0
        if(el->clockSourceMonotonic != *csm && el->timer.idTree.root) {
183
0
            UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
184
0
                           "Eventloop\t| Setting a different monotonic clock, ",
185
0
                           "but existing timers have been registered with a "
186
0
                           "different clock source");
187
0
        }
188
0
        el->clockSourceMonotonic = *csm;
189
0
    }
190
191
#if !defined(UA_ARCHITECTURE_POSIX)
192
    if(cs || csm) {
193
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
194
                       "Eventloop\t| Setting a different clock source ",
195
                       "not supported for this architecture");
196
    }
197
#endif
198
199
    /* Create the self-pipe */
200
0
    int err = UA_EventLoopPOSIX_pipe(el->selfpipe);
201
0
    if(err != 0) {
202
0
        UA_LOG_SOCKET_ERRNO_WRAP(
203
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
204
0
                          "Eventloop\t| Could not create the self-pipe (%s)",
205
0
                          errno_str));
206
0
        UA_UNLOCK(&el->elMutex);
207
0
        return UA_STATUSCODE_BADINTERNALERROR;
208
0
    }
209
210
    /* Create the epoll socket */
211
0
#ifdef UA_HAVE_EPOLL
212
0
    el->epollfd = epoll_create1(0);
213
0
    if(el->epollfd == -1) {
214
0
        UA_LOG_SOCKET_ERRNO_WRAP(
215
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
216
0
                          "Eventloop\t| Could not create the epoll socket (%s)",
217
0
                          errno_str));
218
0
        UA_close(el->selfpipe[0]);
219
0
        UA_close(el->selfpipe[1]);
220
0
        UA_UNLOCK(&el->elMutex);
221
0
        return UA_STATUSCODE_BADINTERNALERROR;
222
0
    }
223
224
    /* epoll always listens on the self-pipe. This is the only epoll_event that
225
     * has a NULL data pointer. */
226
0
    struct epoll_event event;
227
0
    memset(&event, 0, sizeof(struct epoll_event));
228
0
    event.events = EPOLLIN;
229
0
    err = epoll_ctl(el->epollfd, EPOLL_CTL_ADD, el->selfpipe[0], &event);
230
0
    if(err != 0) {
231
0
        UA_LOG_SOCKET_ERRNO_WRAP(
232
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
233
0
                          "Eventloop\t| Could not register the self-pipe for epoll (%s)",
234
0
                          errno_str));
235
0
        UA_close(el->selfpipe[0]);
236
0
        UA_close(el->selfpipe[1]);
237
0
        close(el->epollfd);
238
0
        UA_UNLOCK(&el->elMutex);
239
0
        return UA_STATUSCODE_BADINTERNALERROR;
240
0
    }
241
0
#endif
242
243
    /* Start the EventSources */
244
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
245
0
    UA_EventSource *es = el->eventLoop.eventSources;
246
0
    while(es) {
247
0
        res |= es->start(es);
248
0
        es = es->next;
249
0
    }
250
251
    /* Dirty-write the state that is const "from the outside" */
252
0
    *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state =
253
0
        UA_EVENTLOOPSTATE_STARTED;
254
255
0
    UA_UNLOCK(&el->elMutex);
256
0
    return res;
257
0
}
258
259
static void
260
0
checkClosed(UA_EventLoopPOSIX *el) {
261
0
    UA_LOCK_ASSERT(&el->elMutex);
262
263
0
    UA_EventSource *es = el->eventLoop.eventSources;
264
0
    while(es) {
265
0
        if(es->state != UA_EVENTSOURCESTATE_STOPPED)
266
0
            return;
267
0
        es = es->next;
268
0
    }
269
270
    /* Not closed until all delayed callbacks are processed */
271
0
    if(el->delayedHead1 != NULL && el->delayedHead2 != NULL)
272
0
        return;
273
274
    /* Close the self-pipe when everything else is done */
275
0
    UA_close(el->selfpipe[0]);
276
0
    UA_close(el->selfpipe[1]);
277
278
    /* Dirty-write the state that is const "from the outside" */
279
0
    *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state =
280
0
        UA_EVENTLOOPSTATE_STOPPED;
281
282
    /* Close the epoll/IOCP socket once all EventSources have shut down */
283
0
#ifdef UA_HAVE_EPOLL
284
0
    UA_close(el->epollfd);
285
0
#endif
286
287
0
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
288
0
                 "The EventLoop has stopped");
289
0
}
290
291
static void
292
0
UA_EventLoopPOSIX_stop(UA_EventLoopPOSIX *el) {
293
0
    UA_LOCK(&el->elMutex);
294
295
0
    if(el->eventLoop.state != UA_EVENTLOOPSTATE_STARTED) {
296
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
297
0
                       "The EventLoop is not running, cannot be stopped");
298
0
        UA_UNLOCK(&el->elMutex);
299
0
        return;
300
0
    }
301
302
0
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
303
0
                 "Stopping the EventLoop");
304
305
    /* Set to STOPPING to prevent "normal use" */
306
0
    *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state =
307
0
        UA_EVENTLOOPSTATE_STOPPING;
308
309
    /* Stop all event sources (asynchronous) */
310
0
    UA_EventSource *es = el->eventLoop.eventSources;
311
0
    for(; es; es = es->next) {
312
0
        if(es->state == UA_EVENTSOURCESTATE_STARTING ||
313
0
           es->state == UA_EVENTSOURCESTATE_STARTED) {
314
0
            es->stop(es);
315
0
        }
316
0
    }
317
318
    /* Set to STOPPED if all EventSources are STOPPED */
319
0
    checkClosed(el);
320
321
0
    UA_UNLOCK(&el->elMutex);
322
0
}
323
324
static UA_StatusCode
325
0
UA_EventLoopPOSIX_run(UA_EventLoopPOSIX *el, UA_UInt32 timeout) {
326
0
    UA_LOCK(&el->elMutex);
327
328
0
    if(el->executing) {
329
0
        UA_LOG_ERROR(el->eventLoop.logger,
330
0
                     UA_LOGCATEGORY_EVENTLOOP,
331
0
                     "Cannot run EventLoop from the run method itself");
332
0
        UA_UNLOCK(&el->elMutex);
333
0
        return UA_STATUSCODE_BADINTERNALERROR;
334
0
    }
335
336
0
    el->executing = true;
337
338
0
    if(el->eventLoop.state == UA_EVENTLOOPSTATE_FRESH ||
339
0
       el->eventLoop.state == UA_EVENTLOOPSTATE_STOPPED) {
340
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
341
0
                       "Cannot run a stopped EventLoop");
342
0
        el->executing = false;
343
0
        UA_UNLOCK(&el->elMutex);
344
0
        return UA_STATUSCODE_BADINTERNALERROR;
345
0
    }
346
347
0
    UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
348
0
                 "Iterate the EventLoop");
349
350
    /* Process cyclic callbacks */
351
0
    UA_DateTime dateBefore =
352
0
        el->eventLoop.dateTime_nowMonotonic(&el->eventLoop);
353
354
0
    UA_DateTime dateNext = UA_Timer_process(&el->timer, dateBefore);
355
356
    /* Process delayed callbacks here:
357
     * - Removes closed sockets already here instead of polling them again.
358
     * - The timeout for polling is selected to be ready in time for the next
359
     *   cyclic callback. So we want to do little work between the timeout
360
     *   running out and executing the due cyclic callbacks. */
361
0
    processDelayed(el);
362
363
    /* A delayed callback could create another delayed callback (or re-add
364
     * itself). In that case we don't want to wait (indefinitely) for an event
365
     * to happen. Process queued events but don't sleep. Then process the
366
     * delayed callbacks in the next iteration. */
367
0
    if(el->delayedHead1 != NULL && el->delayedHead2 != NULL)
368
0
        timeout = 0;
369
370
    /* Compute the remaining time */
371
0
    UA_DateTime maxDate = dateBefore + (timeout * UA_DATETIME_MSEC);
372
0
    if(dateNext > maxDate)
373
0
        dateNext = maxDate;
374
0
    UA_DateTime listenTimeout =
375
0
        dateNext - el->eventLoop.dateTime_nowMonotonic(&el->eventLoop);
376
0
    if(listenTimeout < 0)
377
0
        listenTimeout = 0;
378
379
    /* Listen on the active file-descriptors (sockets) from the
380
     * ConnectionManagers */
381
0
    UA_StatusCode rv = UA_EventLoopPOSIX_pollFDs(el, listenTimeout);
382
383
    /* Check if the last EventSource was successfully stopped */
384
0
    if(el->eventLoop.state == UA_EVENTLOOPSTATE_STOPPING)
385
0
        checkClosed(el);
386
387
0
    el->executing = false;
388
0
    UA_UNLOCK(&el->elMutex);
389
0
    return rv;
390
0
}
391
392
/*****************************/
393
/* Registering Event Sources */
394
/*****************************/
395
396
static UA_StatusCode
397
UA_EventLoopPOSIX_registerEventSource(UA_EventLoopPOSIX *el,
398
0
                                      UA_EventSource *es) {
399
0
    UA_LOCK(&el->elMutex);
400
401
    /* Already registered? */
402
0
    if(es->state != UA_EVENTSOURCESTATE_FRESH) {
403
0
        UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
404
0
                     "Cannot register the EventSource \"%.*s\": "
405
0
                     "already registered",
406
0
                     (int)es->name.length, (char*)es->name.data);
407
0
        UA_UNLOCK(&el->elMutex);
408
0
        return UA_STATUSCODE_BADINTERNALERROR;
409
0
    }
410
411
    /* Add to linked list */
412
0
    es->next = el->eventLoop.eventSources;
413
0
    el->eventLoop.eventSources = es;
414
415
0
    es->eventLoop = &el->eventLoop;
416
0
    es->state = UA_EVENTSOURCESTATE_STOPPED;
417
418
    /* Start if the entire EventLoop is started */
419
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
420
0
    if(el->eventLoop.state == UA_EVENTLOOPSTATE_STARTED)
421
0
        res = es->start(es);
422
423
0
    UA_UNLOCK(&el->elMutex);
424
0
    return res;
425
0
}
426
427
static UA_StatusCode
428
UA_EventLoopPOSIX_deregisterEventSource(UA_EventLoopPOSIX *el,
429
0
                                        UA_EventSource *es) {
430
0
    UA_LOCK(&el->elMutex);
431
432
0
    if(es->state != UA_EVENTSOURCESTATE_STOPPED) {
433
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
434
0
                       "Cannot deregister the EventSource %.*s: "
435
0
                       "Has to be stopped first",
436
0
                       (int)es->name.length, es->name.data);
437
0
        UA_UNLOCK(&el->elMutex);
438
0
        return UA_STATUSCODE_BADINTERNALERROR;
439
0
    }
440
441
    /* Remove from the linked list */
442
0
    UA_EventSource **s = &el->eventLoop.eventSources;
443
0
    while(*s) {
444
0
        if(*s == es) {
445
0
            *s = es->next;
446
0
            break;
447
0
        }
448
0
        s = &(*s)->next;
449
0
    }
450
451
    /* Set the state to non-registered */
452
0
    es->state = UA_EVENTSOURCESTATE_FRESH;
453
454
0
    UA_UNLOCK(&el->elMutex);
455
0
    return UA_STATUSCODE_GOOD;
456
0
}
457
458
/***************/
459
/* Time Domain */
460
/***************/
461
462
static UA_DateTime
463
0
UA_EventLoopPOSIX_DateTime_now(UA_EventLoop *el) {
464
0
#if defined(UA_ARCHITECTURE_POSIX)
465
0
    UA_EventLoopPOSIX *pel = (UA_EventLoopPOSIX*)el;
466
0
    struct timespec ts;
467
0
    int res = clock_gettime(pel->clockSource, &ts);
468
0
    if(UA_UNLIKELY(res != 0))
469
0
        return 0;
470
0
    return (ts.tv_sec * UA_DATETIME_SEC) + (ts.tv_nsec / 100) + UA_DATETIME_UNIX_EPOCH;
471
#else
472
    return UA_DateTime_now();
473
#endif
474
0
}
475
476
static UA_DateTime
477
0
UA_EventLoopPOSIX_DateTime_nowMonotonic(UA_EventLoop *el) {
478
0
#if defined(UA_ARCHITECTURE_POSIX)
479
0
    UA_EventLoopPOSIX *pel = (UA_EventLoopPOSIX*)el;
480
0
    struct timespec ts;
481
0
    int res = clock_gettime(pel->clockSourceMonotonic, &ts);
482
0
    if(UA_UNLIKELY(res != 0))
483
0
        return 0;
484
    /* Also add the unix epoch for the monotonic clock. So we get a "normal"
485
     * output when a "normal" source is configured. */
486
0
    return (ts.tv_sec * UA_DATETIME_SEC) + (ts.tv_nsec / 100) + UA_DATETIME_UNIX_EPOCH;
487
#else
488
    return UA_DateTime_nowMonotonic();
489
#endif
490
0
}
491
492
static UA_Int64
493
0
UA_EventLoopPOSIX_DateTime_localTimeUtcOffset(UA_EventLoop *el) {
494
    /* TODO: Fix for custom clock sources */
495
0
    return UA_DateTime_localTimeUtcOffset();
496
0
}
497
498
/*************************/
499
/* Initialize and Delete */
500
/*************************/
501
502
static UA_StatusCode
503
0
UA_EventLoopPOSIX_free(UA_EventLoopPOSIX *el) {
504
0
    UA_LOCK(&el->elMutex);
505
506
    /* Check if the EventLoop can be deleted */
507
0
    if(el->eventLoop.state != UA_EVENTLOOPSTATE_STOPPED &&
508
0
       el->eventLoop.state != UA_EVENTLOOPSTATE_FRESH) {
509
0
        UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
510
0
                       "Cannot delete a running EventLoop");
511
0
        UA_UNLOCK(&el->elMutex);
512
0
        return UA_STATUSCODE_BADINTERNALERROR;
513
0
    }
514
515
    /* Deregister and delete all the EventSources */
516
0
    while(el->eventLoop.eventSources) {
517
0
        UA_EventSource *es = el->eventLoop.eventSources;
518
0
        UA_EventLoopPOSIX_deregisterEventSource(el, es);
519
0
        es->free(es);
520
0
    }
521
522
    /* Remove the repeated timed callbacks */
523
0
    UA_Timer_clear(&el->timer);
524
525
    /* Process remaining delayed callbacks */
526
0
    processDelayed(el);
527
528
#ifdef UA_ARCHITECTURE_WIN32
529
    /* Stop the Windows networking subsystem */
530
    WSACleanup();
531
#endif
532
533
0
    UA_KeyValueMap_clear(&el->eventLoop.params);
534
535
    /* Clean up */
536
0
    UA_UNLOCK(&el->elMutex);
537
0
    UA_LOCK_DESTROY(&el->elMutex);
538
0
    UA_free(el);
539
0
    return UA_STATUSCODE_GOOD;
540
0
}
541
542
static void
543
0
UA_EventLoopPOSIX_lock(UA_EventLoop *public_el) {
544
0
    UA_LOCK(&((UA_EventLoopPOSIX*)public_el)->elMutex);
545
0
}
546
static void
547
0
UA_EventLoopPOSIX_unlock(UA_EventLoop *public_el) {
548
0
    UA_UNLOCK(&((UA_EventLoopPOSIX*)public_el)->elMutex);
549
0
}
550
551
UA_EventLoop *
552
0
UA_EventLoop_new_POSIX(const UA_Logger *logger) {
553
#ifdef UA_ARCHITECTURE_WIN32
554
    /* Start the WSA networking subsystem on Windows */
555
    WSADATA wsaData;
556
    int iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
557
    if(iResult != 0) {
558
        UA_LOG_ERROR(logger, UA_LOGCATEGORY_EVENTLOOP,
559
                     "Initializing the WSA subsystem failed: %d", iResult);
560
        return NULL;
561
    }
562
#endif
563
564
0
    UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)
565
0
        UA_calloc(1, sizeof(UA_EventLoopPOSIX));
566
0
    if(!el)
567
0
        return NULL;
568
569
0
    UA_LOCK_INIT(&el->elMutex);
570
0
    UA_Timer_init(&el->timer);
571
572
    /* Initialize the queue */
573
0
    el->delayedTail = &el->delayedHead1;
574
0
    el->delayedHead2 = (UA_DelayedCallback*)0x01; /* sentinel value */
575
576
    /* Set the public EventLoop content */
577
0
    el->eventLoop.logger = logger;
578
579
    /* Initialize the clock source to the default */
580
0
#if defined(UA_ARCHITECTURE_POSIX)
581
0
    el->clockSource = CLOCK_REALTIME;
582
0
# ifdef CLOCK_MONOTONIC_RAW
583
0
    el->clockSourceMonotonic = CLOCK_MONOTONIC_RAW;
584
# else
585
    el->clockSourceMonotonic = CLOCK_MONOTONIC;
586
# endif
587
0
#endif
588
589
    /* Set the method pointers for the interface */
590
0
    el->eventLoop.start = (UA_StatusCode (*)(UA_EventLoop*))UA_EventLoopPOSIX_start;
591
0
    el->eventLoop.stop = (void (*)(UA_EventLoop*))UA_EventLoopPOSIX_stop;
592
0
    el->eventLoop.free = (UA_StatusCode (*)(UA_EventLoop*))UA_EventLoopPOSIX_free;
593
0
    el->eventLoop.run = (UA_StatusCode (*)(UA_EventLoop*, UA_UInt32))UA_EventLoopPOSIX_run;
594
0
    el->eventLoop.cancel = (void (*)(UA_EventLoop*))UA_EventLoopPOSIX_cancel;
595
596
0
    el->eventLoop.dateTime_now = UA_EventLoopPOSIX_DateTime_now;
597
0
    el->eventLoop.dateTime_nowMonotonic =
598
0
        UA_EventLoopPOSIX_DateTime_nowMonotonic;
599
0
    el->eventLoop.dateTime_localTimeUtcOffset =
600
0
        UA_EventLoopPOSIX_DateTime_localTimeUtcOffset;
601
602
0
    el->eventLoop.nextTimer = UA_EventLoopPOSIX_nextTimer;
603
0
    el->eventLoop.addTimer = UA_EventLoopPOSIX_addTimer;
604
0
    el->eventLoop.modifyTimer = UA_EventLoopPOSIX_modifyTimer;
605
0
    el->eventLoop.removeTimer = UA_EventLoopPOSIX_removeTimer;
606
0
    el->eventLoop.addDelayedCallback = UA_EventLoopPOSIX_addDelayedCallback;
607
0
    el->eventLoop.removeDelayedCallback = UA_EventLoopPOSIX_removeDelayedCallback;
608
609
0
    el->eventLoop.registerEventSource =
610
0
        (UA_StatusCode (*)(UA_EventLoop*, UA_EventSource*))
611
0
        UA_EventLoopPOSIX_registerEventSource;
612
0
    el->eventLoop.deregisterEventSource =
613
0
        (UA_StatusCode (*)(UA_EventLoop*, UA_EventSource*))
614
0
        UA_EventLoopPOSIX_deregisterEventSource;
615
616
0
    el->eventLoop.lock = UA_EventLoopPOSIX_lock;
617
0
    el->eventLoop.unlock = UA_EventLoopPOSIX_unlock;
618
619
0
    return &el->eventLoop;
620
0
}
621
622
/***************************/
623
/* Network Buffer Handling */
624
/***************************/
625
626
UA_StatusCode
627
UA_EventLoopPOSIX_allocNetworkBuffer(UA_ConnectionManager *cm,
628
                                     uintptr_t connectionId,
629
                                     UA_ByteString *buf,
630
0
                                     size_t bufSize) {
631
0
    UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm;
632
0
    if(pcm->txBuffer.length == 0)
633
0
        return UA_ByteString_allocBuffer(buf, bufSize);
634
0
    if(pcm->txBuffer.length < bufSize)
635
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
636
0
    *buf = pcm->txBuffer;
637
0
    buf->length = bufSize;
638
0
    return UA_STATUSCODE_GOOD;
639
0
}
640
641
void
642
UA_EventLoopPOSIX_freeNetworkBuffer(UA_ConnectionManager *cm,
643
                                    uintptr_t connectionId,
644
0
                                    UA_ByteString *buf) {
645
0
    UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm;
646
0
    if(pcm->txBuffer.data == buf->data)
647
0
        UA_ByteString_init(buf);
648
0
    else
649
0
        UA_ByteString_clear(buf);
650
0
}
651
652
UA_StatusCode
653
0
UA_EventLoopPOSIX_allocateStaticBuffers(UA_POSIXConnectionManager *pcm) {
654
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
655
0
    UA_UInt32 rxBufSize = 2u << 16; /* The default is 64kb */
656
0
    const UA_UInt32 *configRxBufSize = (const UA_UInt32 *)
657
0
        UA_KeyValueMap_getScalar(&pcm->cm.eventSource.params,
658
0
                                 UA_QUALIFIEDNAME(0, "recv-bufsize"),
659
0
                                 &UA_TYPES[UA_TYPES_UINT32]);
660
0
    if(configRxBufSize)
661
0
        rxBufSize = *configRxBufSize;
662
0
    if(pcm->rxBuffer.length != rxBufSize) {
663
0
        UA_ByteString_clear(&pcm->rxBuffer);
664
0
        res = UA_ByteString_allocBuffer(&pcm->rxBuffer, rxBufSize);
665
0
    }
666
667
0
    const UA_UInt32 *txBufSize = (const UA_UInt32 *)
668
0
        UA_KeyValueMap_getScalar(&pcm->cm.eventSource.params,
669
0
                                 UA_QUALIFIEDNAME(0, "send-bufsize"),
670
0
                                 &UA_TYPES[UA_TYPES_UINT32]);
671
0
    if(txBufSize && pcm->txBuffer.length != *txBufSize) {
672
0
        UA_ByteString_clear(&pcm->txBuffer);
673
0
        res |= UA_ByteString_allocBuffer(&pcm->txBuffer, *txBufSize);
674
0
    }
675
0
    return res;
676
0
}
677
678
/******************/
679
/* Socket Options */
680
/******************/
681
682
enum ZIP_CMP
683
0
cmpFD(const UA_FD *a, const UA_FD *b) {
684
0
    if(*a == *b)
685
0
        return ZIP_CMP_EQ;
686
0
    return (*a < *b) ? ZIP_CMP_LESS : ZIP_CMP_MORE;
687
0
}
688
689
UA_StatusCode
690
0
UA_EventLoopPOSIX_setNonBlocking(UA_FD sockfd) {
691
0
#ifndef UA_ARCHITECTURE_WIN32
692
0
    int opts = fcntl(sockfd, F_GETFL);
693
0
    if(opts < 0 || fcntl(sockfd, F_SETFL, opts | O_NONBLOCK) < 0)
694
0
        return UA_STATUSCODE_BADINTERNALERROR;
695
#else
696
    u_long iMode = 1;
697
    if(ioctlsocket(sockfd, FIONBIO, &iMode) != NO_ERROR)
698
        return UA_STATUSCODE_BADINTERNALERROR;
699
#endif
700
0
    return UA_STATUSCODE_GOOD;
701
0
}
702
703
UA_StatusCode
704
0
UA_EventLoopPOSIX_setNoSigPipe(UA_FD sockfd) {
705
#ifdef SO_NOSIGPIPE
706
    int val = 1;
707
    int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val));
708
    if(res < 0)
709
        return UA_STATUSCODE_BADINTERNALERROR;
710
#endif
711
0
    return UA_STATUSCODE_GOOD;
712
0
}
713
714
UA_StatusCode
715
0
UA_EventLoopPOSIX_setReusable(UA_FD sockfd) {
716
0
    int enableReuseVal = 1;
717
0
#ifndef UA_ARCHITECTURE_WIN32
718
0
    int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
719
0
                            (const char*)&enableReuseVal, sizeof(enableReuseVal));
720
0
    res |= UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT,
721
0
                            (const char*)&enableReuseVal, sizeof(enableReuseVal));
722
0
    return (res == 0) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADINTERNALERROR;
723
#else
724
    int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
725
                            (const char*)&enableReuseVal, sizeof(enableReuseVal));
726
    return (res == 0) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADINTERNALERROR;
727
#endif
728
0
}
729
730
/************************/
731
/* Select / epoll Logic */
732
/************************/
733
734
/* Re-arm the self-pipe socket for the next signal by reading from it */
735
static void
736
0
flushSelfPipe(UA_SOCKET s) {
737
0
    char buf[128];
738
#ifdef UA_ARCHITECTURE_WIN32
739
    recv(s, buf, 128, 0);
740
#else
741
0
    ssize_t i;
742
0
    do {
743
0
        i = read(s, buf, 128);
744
0
    } while(i > 0);
745
0
#endif
746
0
}
747
748
#if !defined(UA_HAVE_EPOLL)
749
750
UA_StatusCode
751
UA_EventLoopPOSIX_registerFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
752
    UA_LOCK_ASSERT(&el->elMutex);
753
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
754
                 "Registering fd: %u", (unsigned)rfd->fd);
755
756
    /* Realloc */
757
    UA_RegisteredFD **fds_tmp = (UA_RegisteredFD**)
758
        UA_realloc(el->fds, sizeof(UA_RegisteredFD*) * (el->fdsSize + 1));
759
    if(!fds_tmp) {
760
        return UA_STATUSCODE_BADOUTOFMEMORY;
761
    }
762
    el->fds = fds_tmp;
763
764
    /* Add to the last entry */
765
    el->fds[el->fdsSize] = rfd;
766
    el->fdsSize++;
767
    return UA_STATUSCODE_GOOD;
768
}
769
770
UA_StatusCode
771
UA_EventLoopPOSIX_modifyFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
772
    /* Do nothing, it is enough if the data was changed in the rfd */
773
    UA_LOCK_ASSERT(&el->elMutex);
774
    return UA_STATUSCODE_GOOD;
775
}
776
777
void
778
UA_EventLoopPOSIX_deregisterFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
779
    UA_LOCK_ASSERT(&el->elMutex);
780
    UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
781
                 "Unregistering fd: %u", (unsigned)rfd->fd);
782
783
    /* Find the entry */
784
    size_t i = 0;
785
    for(; i < el->fdsSize; i++) {
786
        if(el->fds[i] == rfd)
787
            break;
788
    }
789
790
    /* Not found? */
791
    if(i == el->fdsSize)
792
        return;
793
794
    if(el->fdsSize > 1) {
795
        /* Move the last entry in the ith slot and realloc. */
796
        el->fdsSize--;
797
        el->fds[i] = el->fds[el->fdsSize];
798
        UA_RegisteredFD **fds_tmp = (UA_RegisteredFD**)
799
            UA_realloc(el->fds, sizeof(UA_RegisteredFD*) * el->fdsSize);
800
        /* if realloc fails the fds are still in a correct state with
801
         * possibly lost memory, so failing silently here is ok */
802
        if(fds_tmp)
803
            el->fds = fds_tmp;
804
    } else {
805
        /* Remove the last entry */
806
        UA_free(el->fds);
807
        el->fds = NULL;
808
        el->fdsSize = 0;
809
    }
810
}
811
812
static UA_FD
813
setFDSets(UA_EventLoopPOSIX *el, fd_set *readset, fd_set *writeset, fd_set *errset) {
814
    UA_LOCK_ASSERT(&el->elMutex);
815
816
    FD_ZERO(readset);
817
    FD_ZERO(writeset);
818
    FD_ZERO(errset);
819
820
    /* Always listen on the read-end of the pipe */
821
    UA_FD highestfd = el->selfpipe[0];
822
    FD_SET(el->selfpipe[0], readset);
823
824
    for(size_t i = 0; i < el->fdsSize; i++) {
825
        UA_FD currentFD = el->fds[i]->fd;
826
827
        /* Add to the fd_sets */
828
        if(el->fds[i]->listenEvents & UA_FDEVENT_IN)
829
            FD_SET(currentFD, readset);
830
        if(el->fds[i]->listenEvents & UA_FDEVENT_OUT)
831
            FD_SET(currentFD, writeset);
832
833
        /* Always return errors */
834
        FD_SET(currentFD, errset);
835
836
        /* Highest fd? */
837
        if(currentFD > highestfd)
838
            highestfd = currentFD;
839
    }
840
    return highestfd;
841
}
842
843
UA_StatusCode
844
UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
845
    UA_assert(listenTimeout >= 0);
846
    UA_LOCK_ASSERT(&el->elMutex);
847
848
    fd_set readset, writeset, errset;
849
    UA_FD highestfd = setFDSets(el, &readset, &writeset, &errset);
850
851
    /* Nothing to do? */
852
    if(highestfd == UA_INVALID_FD) {
853
        UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
854
                     "No valid FDs for processing");
855
        return UA_STATUSCODE_GOOD;
856
    }
857
858
    struct timeval tmptv = {
859
#ifndef UA_ARCHITECTURE_WIN32
860
        (time_t)(listenTimeout / UA_DATETIME_SEC),
861
        (suseconds_t)((listenTimeout % UA_DATETIME_SEC) / UA_DATETIME_USEC)
862
#else
863
        (long)(listenTimeout / UA_DATETIME_SEC),
864
        (long)((listenTimeout % UA_DATETIME_SEC) / UA_DATETIME_USEC)
865
#endif
866
    };
867
868
    UA_UNLOCK(&el->elMutex);
869
    int selectStatus = UA_select(highestfd+1, &readset, &writeset, &errset, &tmptv);
870
    UA_LOCK(&el->elMutex);
871
    if(selectStatus < 0) {
872
        /* We will retry, only log the error */
873
        UA_LOG_SOCKET_ERRNO_WRAP(
874
            UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
875
                           "Error during select: %s", errno_str));
876
        return UA_STATUSCODE_GOOD;
877
    }
878
879
    /* The self-pipe has received. Clear the buffer by reading. */
880
    if(UA_UNLIKELY(FD_ISSET(el->selfpipe[0], &readset)))
881
        flushSelfPipe(el->selfpipe[0]);
882
883
    /* Loop over all registered FD to see if an event arrived. Yes, this is why
884
     * select is slow for many open sockets. */
885
    for(size_t i = 0; i < el->fdsSize; i++) {
886
        UA_RegisteredFD *rfd = el->fds[i];
887
888
        /* The rfd is already registered for removal. Don't process incoming
889
         * events any longer. */
890
        if(rfd->dc.callback)
891
            continue;
892
893
        /* Event signaled for the fd? */
894
        short event = 0;
895
        if(FD_ISSET(rfd->fd, &readset)) {
896
            event = UA_FDEVENT_IN;
897
        } else if(FD_ISSET(rfd->fd, &writeset)) {
898
            event = UA_FDEVENT_OUT;
899
        } else if(FD_ISSET(rfd->fd, &errset)) {
900
            event = UA_FDEVENT_ERR;
901
        } else {
902
            continue;
903
        }
904
905
        UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
906
                     "Processing event %u on fd %u", (unsigned)event,
907
                     (unsigned)rfd->fd);
908
909
        /* Call the EventSource callback */
910
        rfd->eventSourceCB(rfd->es, rfd, event);
911
912
        /* The fd has removed itself */
913
        if(i == el->fdsSize || rfd != el->fds[i])
914
            i--;
915
    }
916
    return UA_STATUSCODE_GOOD;
917
}
918
919
#else /* defined(UA_HAVE_EPOLL) */
920
921
UA_StatusCode
922
0
UA_EventLoopPOSIX_registerFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
923
0
    struct epoll_event event;
924
0
    memset(&event, 0, sizeof(struct epoll_event));
925
0
    event.data.ptr = rfd;
926
0
    event.events = 0;
927
0
    if(rfd->listenEvents & UA_FDEVENT_IN)
928
0
        event.events |= EPOLLIN;
929
0
    if(rfd->listenEvents & UA_FDEVENT_OUT)
930
0
        event.events |= EPOLLOUT;
931
932
0
    int err = epoll_ctl(el->epollfd, EPOLL_CTL_ADD, rfd->fd, &event);
933
0
    if(err != 0) {
934
0
        UA_LOG_SOCKET_ERRNO_WRAP(
935
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
936
0
                          "TCP %u\t| Could not register for epoll (%s)",
937
0
                          rfd->fd, errno_str));
938
0
        return UA_STATUSCODE_BADINTERNALERROR;
939
0
    }
940
0
    return UA_STATUSCODE_GOOD;
941
0
}
942
943
UA_StatusCode
944
0
UA_EventLoopPOSIX_modifyFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
945
0
    struct epoll_event event;
946
0
    memset(&event, 0, sizeof(struct epoll_event));
947
0
    event.data.ptr = rfd;
948
0
    event.events = 0;
949
0
    if(rfd->listenEvents & UA_FDEVENT_IN)
950
0
        event.events |= EPOLLIN;
951
0
    if(rfd->listenEvents & UA_FDEVENT_OUT)
952
0
        event.events |= EPOLLOUT;
953
954
0
    int err = epoll_ctl(el->epollfd, EPOLL_CTL_MOD, rfd->fd, &event);
955
0
    if(err != 0) {
956
0
        UA_LOG_SOCKET_ERRNO_WRAP(
957
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
958
0
                          "TCP %u\t| Could not modify for epoll (%s)",
959
0
                          rfd->fd, errno_str));
960
0
        return UA_STATUSCODE_BADINTERNALERROR;
961
0
    }
962
0
    return UA_STATUSCODE_GOOD;
963
0
}
964
965
void
966
0
UA_EventLoopPOSIX_deregisterFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) {
967
0
    int res = epoll_ctl(el->epollfd, EPOLL_CTL_DEL, rfd->fd, NULL);
968
0
    if(res != 0) {
969
0
        UA_LOG_SOCKET_ERRNO_WRAP(
970
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
971
0
                          "TCP %u\t| Could not deregister from epoll (%s)",
972
0
                          rfd->fd, errno_str));
973
0
    }
974
0
}
975
976
UA_StatusCode
977
0
UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
978
0
    UA_assert(listenTimeout >= 0);
979
980
    /* If there is a positive timeout, wait at least one millisecond, the
981
     * minimum for blocking epoll_wait. This prevents a busy-loop, as the
982
     * open62541 library allows even smaller timeouts, which can result in a
983
     * zero timeout due to rounding to an integer here. */
984
0
    int timeout = (int)(listenTimeout / UA_DATETIME_MSEC);
985
0
    if(timeout == 0 && listenTimeout > 0)
986
0
        timeout = 1;
987
988
    /* Poll the registered sockets */
989
0
    struct epoll_event epoll_events[64];
990
0
    UA_UNLOCK(&el->elMutex);
991
0
    int events = epoll_wait(el->epollfd, epoll_events, 64, timeout);
992
0
    UA_LOCK(&el->elMutex);
993
994
    /* TODO: Replace with pwait2 for higher-precision timeouts once this is
995
     * available in the standard library.
996
     *
997
     * struct timespec precisionTimeout = {
998
     *  (long)(listenTimeout / UA_DATETIME_SEC),
999
     *   (long)((listenTimeout % UA_DATETIME_SEC) * 100)
1000
     * };
1001
     * int events = epoll_pwait2(epollfd, epoll_events, 64,
1002
     *                        precisionTimeout, NULL); */
1003
1004
    /* Handle error conditions */
1005
0
    if(events == -1) {
1006
0
        if(errno == EINTR) {
1007
            /* We will retry, only log the error */
1008
0
            UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
1009
0
                         "Timeout during poll");
1010
0
            return UA_STATUSCODE_GOOD;
1011
0
        }
1012
0
        UA_LOG_SOCKET_ERRNO_WRAP(
1013
0
           UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
1014
0
                          "TCP\t| Error %s, closing the server socket",
1015
0
                          errno_str));
1016
0
        return UA_STATUSCODE_BADINTERNALERROR;
1017
0
    }
1018
1019
    /* Process all received events */
1020
0
    for(int i = 0; i < events; i++) {
1021
0
        UA_RegisteredFD *rfd = (UA_RegisteredFD*)epoll_events[i].data.ptr;
1022
1023
        /* The self-pipe has received */
1024
0
        if(!rfd) {
1025
0
            flushSelfPipe(el->selfpipe[0]);
1026
0
            continue;
1027
0
        }
1028
1029
        /* The rfd is already registered for removal. Don't process incoming
1030
         * events any longer. */
1031
0
        if(rfd->dc.callback)
1032
0
            continue;
1033
1034
        /* Get the event */
1035
0
        short revent = 0;
1036
0
        if((epoll_events[i].events & EPOLLIN) == EPOLLIN) {
1037
0
            revent = UA_FDEVENT_IN;
1038
0
        } else if((epoll_events[i].events & EPOLLOUT) == EPOLLOUT) {
1039
0
            revent = UA_FDEVENT_OUT;
1040
0
        } else {
1041
0
            revent = UA_FDEVENT_ERR;
1042
0
        }
1043
1044
        /* Call the EventSource callback */
1045
0
        rfd->eventSourceCB(rfd->es, rfd, revent);
1046
0
    }
1047
0
    return UA_STATUSCODE_GOOD;
1048
0
}
1049
1050
#endif /* defined(UA_HAVE_EPOLL) */
1051
1052
#if defined(UA_ARCHITECTURE_WIN32) || defined(__APPLE__)
1053
int UA_EventLoopPOSIX_pipe(SOCKET fds[2]) {
1054
    struct sockaddr_in inaddr;
1055
    memset(&inaddr, 0, sizeof(inaddr));
1056
    inaddr.sin_family = AF_INET;
1057
    inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1058
    inaddr.sin_port = 0;
1059
1060
    SOCKET lst = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1061
    bind(lst, (struct sockaddr *)&inaddr, sizeof(inaddr));
1062
    listen(lst, 1);
1063
1064
    struct sockaddr_storage addr;
1065
    memset(&addr, 0, sizeof(addr));
1066
    int len = sizeof(addr);
1067
    getsockname(lst, (struct sockaddr*)&addr, &len);
1068
1069
    fds[0] = socket(AF_INET, SOCK_STREAM, 0);
1070
    int err = connect(fds[0], (struct sockaddr*)&addr, len);
1071
    fds[1] = accept(lst, 0, 0);
1072
#ifdef UA_ARCHITECTURE_WIN32
1073
    closesocket(lst);
1074
#endif
1075
#ifdef __APPLE__
1076
    close(lst);
1077
#endif
1078
1079
    UA_EventLoopPOSIX_setNoSigPipe(fds[0]);
1080
    UA_EventLoopPOSIX_setReusable(fds[0]);
1081
    UA_EventLoopPOSIX_setNonBlocking(fds[0]);
1082
    UA_EventLoopPOSIX_setNoSigPipe(fds[1]);
1083
    UA_EventLoopPOSIX_setReusable(fds[1]);
1084
    UA_EventLoopPOSIX_setNonBlocking(fds[1]);
1085
    return err;
1086
}
1087
#elif defined(__QNX__)
1088
int UA_EventLoopPOSIX_pipe(int fds[2]) {
1089
    int err = pipe(fds); 
1090
    if(err == -1) {
1091
      return err;
1092
    }
1093
1094
    err = fcntl(fds[0], F_SETFL, O_NONBLOCK);
1095
    if(err == -1) {
1096
      return err;
1097
    }
1098
    return err;
1099
}
1100
#endif
1101
1102
void
1103
0
UA_EventLoopPOSIX_cancel(UA_EventLoopPOSIX *el) {
1104
    /* Nothing to do if the EventLoop is not executing */
1105
0
    if(!el->executing)
1106
0
        return;
1107
1108
    /* Trigger the self-pipe */
1109
#ifdef UA_ARCHITECTURE_WIN32
1110
    int err = send(el->selfpipe[1], ".", 1, 0);
1111
#else
1112
0
    ssize_t err = write(el->selfpipe[1], ".", 1);
1113
0
#endif
1114
0
    if(err <= 0) {
1115
        UA_LOG_SOCKET_ERRNO_WRAP(
1116
0
            UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
1117
0
                           "Eventloop\t| Error signaling self-pipe (%s)", errno_str));
1118
0
    }
1119
0
}
1120
1121
#endif