Coverage Report

Created: 2025-11-24 06:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/open62541/src/server/ua_subscription.c
Line
Count
Source
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
 *
5
 *    Copyright 2015-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
6
 *    Copyright 2015 (c) Chris Iatrou
7
 *    Copyright 2015 (c) Joakim L. Gilje
8
 *    Copyright 2015-2016 (c) Sten Grüner
9
 *    Copyright 2015-2016 (c) Oleksiy Vasylyev
10
 *    Copyright 2016-2017 (c) Florian Palm
11
 *    Copyright 2017 (c) frax2222
12
 *    Copyright 2017 (c) Mattias Bornhager
13
 *    Copyright 2017 (c) Stefan Profanter, fortiss GmbH
14
 *    Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA
15
 *    Copyright 2017-2018 (c) Ari Breitkreuz, fortiss GmbH
16
 *    Copyright 2018 (c) Hilscher Gesellschaft für Systemautomation mbH (Author: Martin Lang)
17
 *    Copyright 2018 (c) Fabian Arndt, Root-Core
18
 *    Copyright 2019 (c) HMS Industrial Networks AB (Author: Jonas Green)
19
 *    Copyright 2020-2021 (c) Christian von Arnim, ISW University of Stuttgart (for VDW and umati)
20
 */
21
22
#include "ua_server_internal.h"
23
#include "ua_subscription.h"
24
#include "itoa.h"
25
26
#ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
27
28
0
#define UA_MAX_RETRANSMISSIONQUEUESIZE 256
29
30
/****************/
31
/* Notification */
32
/****************/
33
34
static void UA_Notification_dequeueMon(UA_Notification *n);
35
static void UA_Notification_enqueueSub(UA_Notification *n);
36
static void UA_Notification_dequeueSub(UA_Notification *n);
37
38
UA_Notification *
39
0
UA_Notification_new(void) {
40
0
    UA_Notification *n = (UA_Notification*)UA_calloc(1, sizeof(UA_Notification));
41
0
    if(n) {
42
        /* Set the sentinel for a notification that is not enqueued a
43
         * subscription */
44
0
        TAILQ_NEXT(n, subEntry) = UA_SUBSCRIPTION_QUEUE_SENTINEL;
45
0
    }
46
0
    return n;
47
0
}
48
49
/* Dequeue and delete the notification */
50
static void
51
0
UA_Notification_delete(UA_Notification *n) {
52
0
    UA_assert(n != UA_SUBSCRIPTION_QUEUE_SENTINEL);
53
0
    UA_assert(n->mon);
54
0
    UA_Notification_dequeueMon(n);
55
0
    UA_Notification_dequeueSub(n);
56
0
    switch(n->mon->itemToMonitor.attributeId) {
57
0
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
58
0
    case UA_ATTRIBUTEID_EVENTNOTIFIER:
59
0
        UA_EventFieldList_clear(&n->data.event);
60
0
        break;
61
0
#endif
62
0
    default:
63
0
        UA_MonitoredItemNotification_clear(&n->data.dataChange);
64
0
        break;
65
0
    }
66
0
    UA_free(n);
67
0
}
68
69
/* Add to the MonitoredItem queue, update all counters and then handle overflow */
70
static void
71
0
UA_Notification_enqueueMon(UA_Server *server, UA_Notification *n) {
72
0
    UA_MonitoredItem *mon = n->mon;
73
0
    UA_assert(mon);
74
75
    /* Add to the MonitoredItem */
76
0
    TAILQ_INSERT_TAIL(&mon->queue, n, monEntry);
77
0
    ++mon->queueSize;
78
79
0
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
80
0
    if(n->isOverflowEvent)
81
0
        ++mon->eventOverflows;
82
0
#endif
83
84
    /* Test for consistency */
85
0
    UA_assert(mon->queueSize >= mon->eventOverflows);
86
0
    UA_assert(mon->eventOverflows <= mon->queueSize - mon->eventOverflows + 1);
87
88
    /* Ensure enough space is available in the MonitoredItem. Do this only after
89
     * adding the new Notification. */
90
0
    UA_MonitoredItem_ensureQueueSpace(server, mon);
91
92
0
    UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, mon->subscription,
93
0
                              "MonitoredItem %" PRIi32 " | "
94
0
                              "Notification enqueued (Queue size %lu / %lu)",
95
0
                              mon->monitoredItemId,
96
0
                              (long unsigned)mon->queueSize,
97
0
                              (long unsigned)mon->parameters.queueSize);
98
0
}
99
100
static void
101
0
UA_Notification_enqueueSub(UA_Notification *n) {
102
0
    UA_MonitoredItem *mon = n->mon;
103
0
    UA_assert(mon);
104
105
0
    UA_Subscription *sub = mon->subscription;
106
0
    UA_assert(sub);
107
108
0
    if(TAILQ_NEXT(n, subEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL)
109
0
        return;
110
111
    /* Add to the subscription if reporting is enabled */
112
0
    TAILQ_INSERT_TAIL(&sub->notificationQueue, n, subEntry);
113
0
    ++sub->notificationQueueSize;
114
115
0
    switch(mon->itemToMonitor.attributeId) {
116
0
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
117
0
    case UA_ATTRIBUTEID_EVENTNOTIFIER:
118
0
        ++sub->eventNotifications;
119
0
        break;
120
0
#endif
121
0
    default:
122
0
        ++sub->dataChangeNotifications;
123
0
        break;
124
0
    }
125
0
}
126
127
void
128
0
UA_Notification_enqueueAndTrigger(UA_Server *server, UA_Notification *n) {
129
0
    UA_MonitoredItem *mon = n->mon;
130
0
    UA_Subscription *sub = mon->subscription;
131
0
    UA_assert(sub); /* A MonitoredItem is always attached to a subscription. Can
132
                     * be a local MonitoredItem that gets published immediately
133
                     * with a callback. */
134
135
    /* If reporting or (sampled+triggered), enqueue into the Subscription first
136
     * and then into the MonitoredItem. UA_MonitoredItem_ensureQueueSpace
137
     * (called within UA_Notification_enqueueMon) assumes the notification is
138
     * already in the Subscription's publishing queue. */
139
0
    UA_EventLoop *el = server->config.eventLoop;
140
0
    UA_DateTime nowMonotonic = el->dateTime_nowMonotonic(el);
141
0
    if(mon->monitoringMode == UA_MONITORINGMODE_REPORTING ||
142
0
       (mon->monitoringMode == UA_MONITORINGMODE_SAMPLING &&
143
0
        mon->triggeredUntil > nowMonotonic)) {
144
0
        UA_Notification_enqueueSub(n);
145
0
        mon->triggeredUntil = UA_INT64_MIN;
146
0
        UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub,
147
0
                                  "Notification enqueued (Queue size %lu)",
148
0
                                  (long unsigned)sub->notificationQueueSize);
149
0
    }
150
151
    /* Insert into the MonitoredItem. This checks the queue size and
152
     * handles overflow. */
153
0
    UA_Notification_enqueueMon(server, n);
154
155
0
    for(size_t i = mon->triggeringLinksSize - 1; i < mon->triggeringLinksSize; i--) {
156
        /* Get the triggered MonitoredItem. Remove the link if the MI doesn't exist. */
157
0
        UA_MonitoredItem *triggeredMon =
158
0
            UA_Subscription_getMonitoredItem(sub, mon->triggeringLinks[i]);
159
0
        if(!triggeredMon) {
160
0
            UA_MonitoredItem_removeLink(sub, mon, mon->triggeringLinks[i]);
161
0
            continue;
162
0
        }
163
164
        /* Only sampling MonitoredItems receive a trigger. Reporting
165
         * MonitoredItems send out Notifications anyway and disabled
166
         * MonitoredItems don't create samples to send. */
167
0
        if(triggeredMon->monitoringMode != UA_MONITORINGMODE_SAMPLING)
168
0
            continue;
169
170
        /* Get the latest sampled Notification from the triggered MonitoredItem.
171
         * Enqueue for publication. */
172
0
        UA_Notification *n2 = TAILQ_LAST(&triggeredMon->queue, NotificationQueue);
173
0
        if(n2)
174
0
            UA_Notification_enqueueSub(n2);
175
176
        /* The next Notification within the publishing interval is going to be
177
         * published as well. (Falsely) assume that the publishing cycle has
178
         * started right now, so that we don't have to loop over MonitoredItems
179
         * to deactivate the triggering after the publishing cycle. */
180
0
        triggeredMon->triggeredUntil = nowMonotonic +
181
0
            (UA_DateTime)(sub->publishingInterval * (UA_Double)UA_DATETIME_MSEC);
182
183
0
        UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub,
184
0
                                  "MonitoredItem %u triggers MonitoredItem %u",
185
0
                                  mon->monitoredItemId, triggeredMon->monitoredItemId);
186
0
    }
187
188
    /* If we just enqueued a notification into the local adminSubscription, then
189
     * register a delayed callback for "local publishing". */
190
0
    if(sub == server->adminSubscription && !sub->delayedCallbackRegistered) {
191
0
        sub->delayedCallbackRegistered = true;
192
0
        sub->delayedMoreNotifications.callback =
193
0
            (UA_Callback)UA_Subscription_localPublish;
194
0
        sub->delayedMoreNotifications.application = server;
195
0
        sub->delayedMoreNotifications.context = sub;
196
197
0
        el = server->config.eventLoop;
198
0
        el->addDelayedCallback(el, &sub->delayedMoreNotifications);
199
0
    }
200
0
}
201
202
/* Remove from the MonitoredItem queue. This only happens if the Notification is
203
 * deleted right after. */
204
static void
205
0
UA_Notification_dequeueMon(UA_Notification *n) {
206
0
    UA_MonitoredItem *mon = n->mon;
207
0
    UA_assert(mon);
208
209
    /* Remove from the MonitoredItem queue */
210
0
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
211
0
    if(n->isOverflowEvent)
212
0
        --mon->eventOverflows;
213
0
#endif
214
215
0
    TAILQ_REMOVE(&mon->queue, n, monEntry);
216
0
    --mon->queueSize;
217
218
    /* Test for consistency */
219
0
    UA_assert(mon->queueSize >= mon->eventOverflows);
220
0
    UA_assert(mon->eventOverflows <= mon->queueSize - mon->eventOverflows + 1);
221
0
}
222
223
void
224
0
UA_Notification_dequeueSub(UA_Notification *n) {
225
0
    if(TAILQ_NEXT(n, subEntry) == UA_SUBSCRIPTION_QUEUE_SENTINEL)
226
0
        return;
227
228
0
    UA_MonitoredItem *mon = n->mon;
229
0
    UA_assert(mon);
230
0
    UA_Subscription *sub = mon->subscription;
231
0
    UA_assert(sub);
232
233
0
    switch(mon->itemToMonitor.attributeId) {
234
0
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
235
0
    case UA_ATTRIBUTEID_EVENTNOTIFIER:
236
0
        --sub->eventNotifications;
237
0
        break;
238
0
#endif
239
0
    default:
240
0
        --sub->dataChangeNotifications;
241
0
        break;
242
0
    }
243
244
0
    TAILQ_REMOVE(&sub->notificationQueue, n, subEntry);
245
0
    --sub->notificationQueueSize;
246
247
    /* Reset the sentinel */
248
0
    TAILQ_NEXT(n, subEntry) = UA_SUBSCRIPTION_QUEUE_SENTINEL;
249
0
}
250
251
/****************/
252
/* Subscription */
253
/****************/
254
255
UA_Subscription *
256
502
UA_Subscription_new(void) {
257
    /* Allocate the memory */
258
502
    UA_Subscription *newSub = (UA_Subscription*)UA_calloc(1, sizeof(UA_Subscription));
259
502
    if(!newSub)
260
0
        return NULL;
261
262
    /* The first publish response is sent immediately */
263
502
    newSub->state = UA_SUBSCRIPTIONSTATE_STOPPED;
264
265
    /* Even if the first publish response is a keepalive the sequence number is 1.
266
     * This can happen by a subscription without a monitored item (see CTT test scripts). */
267
502
    newSub->nextSequenceNumber = 1;
268
269
502
    TAILQ_INIT(&newSub->retransmissionQueue);
270
502
    TAILQ_INIT(&newSub->notificationQueue);
271
502
    return newSub;
272
502
}
273
274
static void
275
502
delayedFreeSubscription(void *app, void *context) {
276
502
    UA_free(context);
277
502
}
278
279
void
280
502
UA_Subscription_delete(UA_Server *server, UA_Subscription *sub) {
281
502
    UA_LOCK_ASSERT(&server->serviceMutex);
282
283
502
    UA_EventLoop *el = server->config.eventLoop;
284
285
    /* Unregister the publish callback and possible delayed callback */
286
502
    Subscription_setState(server, sub, UA_SUBSCRIPTIONSTATE_REMOVING);
287
288
    /* Remove delayed callbacks for processing remaining notifications */
289
502
    if(sub->delayedCallbackRegistered) {
290
0
        el->removeDelayedCallback(el, &sub->delayedMoreNotifications);
291
0
        sub->delayedCallbackRegistered = false;
292
0
    }
293
294
    /* Remove the diagnostics object for the subscription */
295
502
#ifdef UA_ENABLE_DIAGNOSTICS
296
502
    if(!UA_NodeId_isNull(&sub->ns0Id))
297
0
        deleteNode(server, sub->ns0Id, true);
298
502
    UA_NodeId_clear(&sub->ns0Id);
299
502
#endif
300
301
502
    UA_LOG_INFO_SUBSCRIPTION(server->config.logging, sub, "Subscription deleted");
302
303
    /* Detach from the session if necessary */
304
502
    if(sub->session)
305
502
        UA_Session_detachSubscription(server, sub->session, sub, true);
306
307
    /* Remove from the server if not previously registered */
308
502
    if(sub->serverListEntry.le_prev) {
309
0
        LIST_REMOVE(sub, serverListEntry);
310
0
        UA_assert(server->subscriptionsSize > 0);
311
0
        server->subscriptionsSize--;
312
0
        server->serverDiagnosticsSummary.currentSubscriptionCount--;
313
0
    }
314
315
    /* Delete monitored Items */
316
502
    UA_assert(server->monitoredItemsSize >= sub->monitoredItemsSize);
317
502
    UA_MonitoredItem *mon, *tmp_mon;
318
502
    LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, tmp_mon) {
319
0
        UA_MonitoredItem_delete(server, mon);
320
0
    }
321
502
    UA_assert(sub->monitoredItemsSize == 0);
322
323
    /* Delete Retransmission Queue */
324
502
    UA_NotificationMessageEntry *nme, *nme_tmp;
325
502
    TAILQ_FOREACH_SAFE(nme, &sub->retransmissionQueue, listEntry, nme_tmp) {
326
0
        TAILQ_REMOVE(&sub->retransmissionQueue, nme, listEntry);
327
0
        UA_NotificationMessage_clear(&nme->message);
328
0
        UA_free(nme);
329
0
        if(sub->session)
330
0
            --sub->session->totalRetransmissionQueueSize;
331
0
        --sub->retransmissionQueueSize;
332
0
    }
333
502
    UA_assert(sub->retransmissionQueueSize == 0);
334
335
    /* Pointers to the subscription may still exist upwards in the call stack.
336
     * Add a delayed callback to remove the Subscription when the current jobs
337
     * have completed. */
338
502
    sub->delayedFreePointers.callback = delayedFreeSubscription;
339
502
    sub->delayedFreePointers.application = NULL;
340
502
    sub->delayedFreePointers.context = sub;
341
502
    el->addDelayedCallback(el, &sub->delayedFreePointers);
342
502
}
343
344
void
345
0
Subscription_resetLifetime(UA_Subscription *sub) {
346
0
    sub->currentLifetimeCount = 0;
347
0
}
348
349
UA_MonitoredItem *
350
0
UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemId) {
351
0
    UA_MonitoredItem *mon;
352
0
    LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
353
0
        if(mon->monitoredItemId == monitoredItemId)
354
0
            break;
355
0
    }
356
0
    return mon;
357
0
}
358
359
static void
360
0
removeOldestRetransmissionMessageFromSub(UA_Subscription *sub) {
361
0
    UA_NotificationMessageEntry *oldestEntry =
362
0
        TAILQ_LAST(&sub->retransmissionQueue, NotificationMessageQueue);
363
0
    TAILQ_REMOVE(&sub->retransmissionQueue, oldestEntry, listEntry);
364
0
    UA_NotificationMessage_clear(&oldestEntry->message);
365
0
    UA_free(oldestEntry);
366
0
    --sub->retransmissionQueueSize;
367
0
    if(sub->session)
368
0
        --sub->session->totalRetransmissionQueueSize;
369
370
0
#ifdef UA_ENABLE_DIAGNOSTICS
371
0
    sub->discardedMessageCount++;
372
0
#endif
373
0
}
374
375
static void
376
0
removeOldestRetransmissionMessageFromSession(UA_Session *session) {
377
0
    UA_NotificationMessageEntry *oldestEntry = NULL;
378
0
    UA_Subscription *oldestSub = NULL;
379
0
    UA_Subscription *sub;
380
0
    TAILQ_FOREACH(sub, &session->subscriptions, sessionListEntry) {
381
0
        UA_NotificationMessageEntry *first =
382
0
            TAILQ_LAST(&sub->retransmissionQueue, NotificationMessageQueue);
383
0
        if(!first)
384
0
            continue;
385
0
        if(!oldestEntry || oldestEntry->message.publishTime > first->message.publishTime) {
386
0
            oldestEntry = first;
387
0
            oldestSub = sub;
388
0
        }
389
0
    }
390
0
    UA_assert(oldestEntry);
391
0
    UA_assert(oldestSub);
392
393
0
    removeOldestRetransmissionMessageFromSub(oldestSub);
394
0
}
395
396
static void
397
UA_Subscription_addRetransmissionMessage(UA_Server *server, UA_Subscription *sub,
398
0
                                         UA_NotificationMessageEntry *entry) {
399
    /* Release the oldest entry if there is not enough space */
400
0
    UA_Session *session = sub->session;
401
0
    if(sub->retransmissionQueueSize >= UA_MAX_RETRANSMISSIONQUEUESIZE) {
402
0
        UA_LOG_WARNING_SUBSCRIPTION(server->config.logging, sub,
403
0
                                    "Subscription retransmission queue overflow");
404
0
        removeOldestRetransmissionMessageFromSub(sub);
405
0
    } else if(session && server->config.maxRetransmissionQueueSize > 0 &&
406
0
              session->totalRetransmissionQueueSize >=
407
0
              server->config.maxRetransmissionQueueSize) {
408
0
        UA_LOG_WARNING_SUBSCRIPTION(server->config.logging, sub,
409
0
                                    "Session-wide retransmission queue overflow");
410
0
        removeOldestRetransmissionMessageFromSession(sub->session);
411
0
    }
412
413
    /* Add entry */
414
0
    TAILQ_INSERT_TAIL(&sub->retransmissionQueue, entry, listEntry);
415
0
    ++sub->retransmissionQueueSize;
416
0
    if(session)
417
0
        ++session->totalRetransmissionQueueSize;
418
0
}
419
420
UA_StatusCode
421
0
UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequenceNumber) {
422
    /* Find the retransmission message */
423
0
    UA_NotificationMessageEntry *entry;
424
0
    TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) {
425
0
        if(entry->message.sequenceNumber == sequenceNumber)
426
0
            break;
427
0
    }
428
0
    if(!entry)
429
0
        return UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN;
430
431
    /* Remove the retransmission message */
432
0
    TAILQ_REMOVE(&sub->retransmissionQueue, entry, listEntry);
433
0
    --sub->retransmissionQueueSize;
434
0
    UA_NotificationMessage_clear(&entry->message);
435
0
    UA_free(entry);
436
437
0
    if(sub->session)
438
0
        --sub->session->totalRetransmissionQueueSize;
439
440
0
    return UA_STATUSCODE_GOOD;
441
0
}
442
443
/* The output counters are only set when the preparation is successful */
444
static UA_StatusCode
445
prepareNotificationMessage(UA_Server *server, UA_Subscription *sub,
446
                           UA_NotificationMessage *message,
447
0
                           size_t maxNotifications) {
448
0
    UA_assert(maxNotifications > 0);
449
450
    /* Allocate an ExtensionObject for Event- and DataChange-Notifications. Also
451
     * there can be StatusChange-Notifications. The standard says in Part 4,
452
     * 7.2.1:
453
     *
454
     * If a Subscription contains MonitoredItems for events and data, this array
455
     * should have not more than 2 elements. */
456
0
    message->notificationData = (UA_ExtensionObject*)
457
0
        UA_Array_new(2, &UA_TYPES[UA_TYPES_EXTENSIONOBJECT]);
458
0
    if(!message->notificationData)
459
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
460
0
    message->notificationDataSize = 2;
461
462
    /* Pre-allocate DataChangeNotifications */
463
0
    size_t notificationDataIdx = 0;
464
0
    size_t dcnPos = 0; /* How many DataChangeNotifications? */
465
0
    UA_DataChangeNotification *dcn = NULL;
466
0
    if(sub->dataChangeNotifications > 0) {
467
0
        dcn = UA_DataChangeNotification_new();
468
0
        if(!dcn) {
469
0
            UA_NotificationMessage_clear(message);
470
0
            return UA_STATUSCODE_BADOUTOFMEMORY;
471
0
        }
472
0
        UA_ExtensionObject_setValue(message->notificationData, dcn,
473
0
                                    &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]);
474
0
        size_t dcnSize = sub->dataChangeNotifications;
475
0
        if(dcnSize > maxNotifications)
476
0
            dcnSize = maxNotifications;
477
0
        dcn->monitoredItems = (UA_MonitoredItemNotification*)
478
0
            UA_Array_new(dcnSize, &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
479
0
        if(!dcn->monitoredItems) {
480
0
            UA_NotificationMessage_clear(message); /* Also frees the dcn */
481
0
            return UA_STATUSCODE_BADOUTOFMEMORY;
482
0
        }
483
0
        dcn->monitoredItemsSize = dcnSize;
484
0
        notificationDataIdx++;
485
0
    }
486
487
0
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
488
0
    size_t enlPos = 0; /* How many EventNotifications? */
489
0
    UA_EventNotificationList *enl = NULL;
490
0
    if(sub->eventNotifications > 0) {
491
0
        enl = UA_EventNotificationList_new();
492
0
        if(!enl) {
493
0
            UA_NotificationMessage_clear(message);
494
0
            return UA_STATUSCODE_BADOUTOFMEMORY;
495
0
        }
496
0
        UA_ExtensionObject_setValue(&message->notificationData[notificationDataIdx],
497
0
                                    enl, &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]);
498
0
        size_t enlSize = sub->eventNotifications;
499
0
        if(enlSize > maxNotifications)
500
0
            enlSize = maxNotifications;
501
0
        enl->events = (UA_EventFieldList*)
502
0
            UA_Array_new(enlSize, &UA_TYPES[UA_TYPES_EVENTFIELDLIST]);
503
0
        if(!enl->events) {
504
0
            UA_NotificationMessage_clear(message);
505
0
            return UA_STATUSCODE_BADOUTOFMEMORY;
506
0
        }
507
0
        enl->eventsSize = enlSize;
508
0
        notificationDataIdx++;
509
0
    }
510
0
#endif
511
512
0
    UA_assert(notificationDataIdx > 0);
513
0
    message->notificationDataSize = notificationDataIdx;
514
515
    /* <-- The point of no return --> */
516
517
    /* How many notifications were moved to the response overall? */
518
0
    size_t totalNotifications = 0;
519
0
    UA_Notification *n, *n_tmp;
520
0
    TAILQ_FOREACH_SAFE(n, &sub->notificationQueue, subEntry, n_tmp) {
521
0
        if(totalNotifications >= maxNotifications)
522
0
            break;
523
524
        /* Move the content to the response */
525
0
        switch(n->mon->itemToMonitor.attributeId) {
526
0
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
527
0
        case UA_ATTRIBUTEID_EVENTNOTIFIER:
528
0
            UA_assert(enl != NULL); /* Have at least one event notification */
529
0
            enl->events[enlPos] = n->data.event;
530
0
            UA_EventFieldList_init(&n->data.event);
531
0
            enlPos++;
532
0
            break;
533
0
#endif
534
0
        default:
535
0
            UA_assert(dcn != NULL); /* Have at least one change notification */
536
0
            dcn->monitoredItems[dcnPos] = n->data.dataChange;
537
0
            UA_DataValue_init(&n->data.dataChange.value);
538
0
            dcnPos++;
539
0
            break;
540
0
        }
541
542
        /* If there are Notifications *before this one* in the MonitoredItem-
543
         * local queue, remove all of them. These are earlier Notifications that
544
         * are non-reporting. And we don't want them to show up after the
545
         * current Notification has been sent out. */
546
0
        UA_Notification *prev;
547
0
        while((prev = TAILQ_PREV(n, NotificationQueue, monEntry))) {
548
0
            UA_Notification_delete(prev);
549
550
            /* Help the Clang scan-analyzer */
551
0
            UA_assert(prev != TAILQ_PREV(n, NotificationQueue, monEntry));
552
0
        }
553
554
        /* Delete the notification, remove from the queues and decrease the counters */
555
0
        UA_Notification_delete(n);
556
557
0
        totalNotifications++;
558
0
    }
559
560
    /* Set sizes */
561
0
    if(dcn) {
562
0
        dcn->monitoredItemsSize = dcnPos;
563
0
        if(dcnPos == 0) {
564
0
            UA_free(dcn->monitoredItems);
565
0
            dcn->monitoredItems = NULL;
566
0
        }
567
0
    }
568
569
0
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
570
0
    if(enl) {
571
0
        enl->eventsSize = enlPos;
572
0
        if(enlPos == 0) {
573
0
            UA_free(enl->events);
574
0
            enl->events = NULL;
575
0
        }
576
0
    }
577
0
#endif
578
579
0
    return UA_STATUSCODE_GOOD;
580
0
}
581
582
/* According to OPC Unified Architecture, Part 4 5.13.1.1 i) The value 0 is
583
 * never used for the sequence number */
584
static UA_UInt32
585
0
UA_Subscription_nextSequenceNumber(UA_UInt32 sequenceNumber) {
586
0
    UA_UInt32 nextSequenceNumber = sequenceNumber + 1;
587
0
    if(nextSequenceNumber == 0)
588
0
        nextSequenceNumber = 1;
589
0
    return nextSequenceNumber;
590
0
}
591
592
static void
593
sendStatusChangeDelete(UA_Server *server, UA_Subscription *sub,
594
0
                       UA_PublishResponseEntry *pre) {
595
    /* Cannot send out the StatusChange because no response is queued. Delete
596
     * the Subscription without sending the StatusChange. */
597
0
    if(!pre) {
598
0
        UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub,
599
0
                                  "Cannot send the StatusChange notification because "
600
0
                                  "no response is queued.");
601
0
        if(UA_StatusCode_isBad(sub->statusChange)) {
602
0
            UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub,
603
0
                                      "Removing the subscription.");
604
0
            UA_Subscription_delete(server, sub);
605
0
        }
606
0
        return;
607
0
    }
608
609
0
    UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub,
610
0
                              "Sending out a StatusChange notification and "
611
0
                              "removing the subscription");
612
613
0
    UA_EventLoop *el = server->config.eventLoop;
614
615
    /* Populate the response */
616
0
    UA_PublishResponse *response = &pre->response;
617
618
0
    UA_StatusChangeNotification scn;
619
0
    UA_StatusChangeNotification_init(&scn);
620
0
    scn.status = sub->statusChange;
621
622
0
    UA_ExtensionObject notificationData;
623
0
    UA_ExtensionObject_setValue(&notificationData, &scn,
624
0
                                &UA_TYPES[UA_TYPES_STATUSCHANGENOTIFICATION]);
625
626
0
    response->notificationMessage.notificationData = &notificationData;
627
0
    response->notificationMessage.notificationDataSize = 1;
628
0
    response->subscriptionId = sub->subscriptionId;
629
0
    response->notificationMessage.publishTime = el->dateTime_now(el);
630
0
    response->notificationMessage.sequenceNumber = sub->nextSequenceNumber;
631
632
    /* Send the response */
633
0
    UA_assert(sub->session); /* Otherwise pre is NULL */
634
0
    UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub,
635
0
                              "Sending out a publish response");
636
0
    sendResponse(server, sub->session->channel, pre->requestId,
637
0
                 (UA_Response *)response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
638
639
    /* Clean up */
640
0
    response->notificationMessage.notificationData = NULL;
641
0
    response->notificationMessage.notificationDataSize = 0;
642
0
    UA_PublishResponse_clear(&pre->response);
643
0
    UA_free(pre);
644
645
    /* Delete the subscription */
646
0
    UA_Subscription_delete(server, sub);
647
0
}
648
649
/* The local adminSubscription forwards notifications to a registered callback
650
 * method. This is done async from a delayed callback registered in the
651
 * EventLoop. */
652
void
653
0
UA_Subscription_localPublish(UA_Server *server, UA_Subscription *sub) {
654
0
    lockServer(server);
655
0
    sub->delayedCallbackRegistered = false;
656
657
0
    UA_Notification *n, *n_tmp;
658
0
    TAILQ_FOREACH_SAFE(n, &sub->notificationQueue, subEntry, n_tmp) {
659
0
        UA_MonitoredItem *mon = n->mon;
660
0
        UA_LocalMonitoredItem *localMon = (UA_LocalMonitoredItem*)mon;
661
662
        /* Move the content to the response */
663
0
        void *nodeContext = NULL;
664
0
        switch(mon->itemToMonitor.attributeId) {
665
0
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
666
0
        case UA_ATTRIBUTEID_EVENTNOTIFIER:
667
            /* Set the fields in the key-value map */
668
0
            UA_assert(n->data.event.eventFieldsSize == localMon->eventFields.mapSize);
669
0
            for(size_t i = 0; i < localMon->eventFields.mapSize; i++) {
670
0
                localMon->eventFields.map[i].value = n->data.event.eventFields[i];
671
0
            }
672
673
            /* Call the callback */
674
0
            localMon->callback.
675
0
                eventCallback(server, mon->monitoredItemId, localMon->context,
676
0
                              localMon->eventFields);
677
0
            break;
678
0
#endif
679
0
        default:
680
0
            getNodeContext(server, mon->itemToMonitor.nodeId, &nodeContext);
681
0
            localMon->callback.
682
0
                dataChangeCallback(server, mon->monitoredItemId, localMon->context,
683
0
                                   &mon->itemToMonitor.nodeId, nodeContext,
684
0
                                   mon->itemToMonitor.attributeId,
685
0
                                   &n->data.dataChange.value);
686
0
            break;
687
0
        }
688
689
        /* If there are Notifications *before this one* in the MonitoredItem-
690
         * local queue, remove all of them. These are earlier Notifications that
691
         * are non-reporting. And we don't want them to show up after the
692
         * current Notification has been sent out. */
693
0
        UA_Notification *prev;
694
0
        while((prev = TAILQ_PREV(n, NotificationQueue, monEntry))) {
695
0
            UA_Notification_delete(prev);
696
697
            /* Help the Clang scan-analyzer */
698
0
            UA_assert(prev != TAILQ_PREV(n, NotificationQueue, monEntry));
699
0
        }
700
701
        /* Delete the notification, remove from the queues and decrease the counters */
702
0
        UA_Notification_delete(n);
703
0
    }
704
705
0
    unlockServer(server);
706
0
}
707
708
static void
709
0
delayedPublishNotifications(UA_Server *server, UA_Subscription *sub) {
710
0
    lockServer(server);
711
0
    sub->delayedCallbackRegistered = false;
712
0
    UA_Subscription_publish(server, sub);
713
0
    unlockServer(server);
714
0
}
715
716
/* Try to publish now. Enqueue a "next publish" as a delayed callback if not
717
 * done. */
718
void
719
0
UA_Subscription_publish(UA_Server *server, UA_Subscription *sub) {
720
0
    UA_EventLoop *el = server->config.eventLoop;
721
722
    /* Get a response */
723
0
    UA_PublishResponseEntry *pre = NULL;
724
0
    if(sub->session) {
725
0
        UA_DateTime nowMonotonic = el->dateTime_nowMonotonic(el);
726
0
        do {
727
            /* Dequeue the oldest response */
728
0
            pre = UA_Session_dequeuePublishReq(sub->session);
729
0
            if(!pre)
730
0
                break;
731
732
            /* Check if the TimeoutHint is still valid. Otherwise return with a bad
733
             * statuscode and continue. */
734
0
            if(pre->maxTime < nowMonotonic) {
735
0
                UA_LOG_DEBUG_SESSION(server->config.logging, sub->session,
736
0
                                     "Publish request %u has timed out", pre->requestId);
737
0
                pre->response.responseHeader.serviceResult = UA_STATUSCODE_BADTIMEOUT;
738
0
                sendResponse(server, sub->session->channel, pre->requestId,
739
0
                             (UA_Response *)&pre->response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
740
0
                UA_PublishResponse_clear(&pre->response);
741
0
                UA_free(pre);
742
0
                pre = NULL;
743
0
            }
744
0
        } while(!pre);
745
0
    }
746
747
    /* Update the LifetimeCounter */
748
0
    if(pre) {
749
0
        Subscription_resetLifetime(sub);
750
0
    } else {
751
0
        UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub,
752
0
                                  "The publish queue is empty");
753
0
        ++sub->currentLifetimeCount;
754
0
        if(sub->currentLifetimeCount > sub->lifeTimeCount) {
755
0
            UA_LOG_WARNING_SUBSCRIPTION(server->config.logging, sub,
756
0
                                        "End of subscription lifetime");
757
            /* Set the StatusChange to delete the subscription. */
758
0
            sub->statusChange = UA_STATUSCODE_BADTIMEOUT;
759
0
        }
760
0
    }
761
762
    /* Send a StatusChange notification if possible and delete the
763
     * Subscription */
764
0
    if(sub->statusChange != UA_STATUSCODE_GOOD) {
765
0
        sendStatusChangeDelete(server, sub, pre);
766
0
        return;
767
0
    }
768
769
    /* Dsiabled subscriptions do not send notifications */
770
0
    UA_UInt32 notifications = (sub->state == UA_SUBSCRIPTIONSTATE_ENABLED) ?
771
0
        sub->notificationQueueSize : 0;
772
773
    /* Limit the number of notifications to the configured maximum */
774
0
    if(notifications > sub->notificationsPerPublish)
775
0
        notifications = sub->notificationsPerPublish;
776
777
    /* Return if no notifications and no keepalive */
778
0
    if(notifications == 0) {
779
0
        ++sub->currentKeepAliveCount;
780
0
        if(sub->currentKeepAliveCount < sub->maxKeepAliveCount) {
781
0
            if(pre)
782
0
                UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
783
0
            return;
784
0
        }
785
0
        UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub, "Sending a KeepAlive");
786
0
    }
787
788
    /* We want to send a response, but cannot. Either because there is no queued
789
     * response or because the Subscription is detached from a Session or because
790
     * the SecureChannel for the Session is closed. */
791
0
    if(!pre || !sub->session || !sub->session->channel) {
792
0
        UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub,
793
0
                                  "Want to send a publish response but cannot. "
794
0
                                  "The subscription is late.");
795
0
        sub->late = true;
796
0
        if(pre)
797
0
            UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
798
0
        return;
799
0
    }
800
801
0
    UA_assert(pre);
802
0
    UA_assert(sub->session); /* Otherwise pre is NULL */
803
804
    /* Prepare the response */
805
0
    UA_PublishResponse *response = &pre->response;
806
0
    UA_NotificationMessage *message = &response->notificationMessage;
807
0
    UA_NotificationMessageEntry *retransmission = NULL;
808
0
#ifdef UA_ENABLE_DIAGNOSTICS
809
0
    size_t priorDataChangeNotifications = sub->dataChangeNotifications;
810
0
    size_t priorEventNotifications = sub->eventNotifications;
811
0
#endif
812
0
    if(notifications > 0) {
813
0
        if(server->config.enableRetransmissionQueue) {
814
            /* Allocate the retransmission entry */
815
0
            retransmission = (UA_NotificationMessageEntry*)
816
0
                UA_malloc(sizeof(UA_NotificationMessageEntry));
817
0
            if(!retransmission) {
818
0
                UA_LOG_WARNING_SUBSCRIPTION(server->config.logging, sub,
819
0
                                            "Could not allocate memory for retransmission. "
820
0
                                            "The subscription is late.");
821
0
                sub->late = true;
822
0
                UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
823
0
                return;
824
0
            }
825
0
        }
826
827
        /* Prepare the response */
828
0
        UA_StatusCode retval =
829
0
            prepareNotificationMessage(server, sub, message, notifications);
830
0
        if(retval != UA_STATUSCODE_GOOD) {
831
0
            UA_LOG_WARNING_SUBSCRIPTION(server->config.logging, sub,
832
0
                                        "Could not prepare the notification message. "
833
0
                                        "The subscription is late.");
834
            /* If the retransmission queue is enabled a retransmission message is allocated */
835
0
            if(retransmission)
836
0
                UA_free(retransmission);
837
0
            sub->late = true;
838
0
            UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
839
0
            return;
840
0
        }
841
0
    }
842
843
    /* <-- The point of no return --> */
844
845
    /* Set up the response */
846
0
    response->subscriptionId = sub->subscriptionId;
847
0
    response->moreNotifications = (sub->notificationQueueSize > 0);
848
0
    message->publishTime = el->dateTime_now(el);
849
850
    /* Set sequence number to message. Started at 1 which is given during
851
     * creating a new subscription. The 1 is required for initial publish
852
     * response with or without an monitored item. */
853
0
    message->sequenceNumber = sub->nextSequenceNumber;
854
855
0
    if(notifications > 0) {
856
        /* If the retransmission queue is enabled a retransmission message is
857
         * allocated */
858
0
        if(retransmission) {
859
            /* Put the notification message into the retransmission queue. This
860
             * needs to be done here, so that the message itself is included in
861
             * the available sequence numbers for acknowledgement. */
862
0
            retransmission->message = response->notificationMessage;
863
0
            UA_Subscription_addRetransmissionMessage(server, sub, retransmission);
864
0
        }
865
        /* Only if a notification was created, the sequence number must be
866
         * increased. For a keepalive the sequence number can be reused. */
867
0
        sub->nextSequenceNumber =
868
0
            UA_Subscription_nextSequenceNumber(sub->nextSequenceNumber);
869
0
    }
870
871
    /* Get the available sequence numbers from the retransmission queue */
872
0
    UA_assert(sub->retransmissionQueueSize <= UA_MAX_RETRANSMISSIONQUEUESIZE);
873
0
    UA_UInt32 seqNumbers[UA_MAX_RETRANSMISSIONQUEUESIZE];
874
0
    response->availableSequenceNumbers = seqNumbers;
875
0
    response->availableSequenceNumbersSize = sub->retransmissionQueueSize;
876
0
    size_t i = 0;
877
0
    UA_NotificationMessageEntry *nme;
878
0
    TAILQ_FOREACH(nme, &sub->retransmissionQueue, listEntry) {
879
0
        response->availableSequenceNumbers[i] = nme->message.sequenceNumber;
880
0
        ++i;
881
0
    }
882
0
    UA_assert(i == sub->retransmissionQueueSize);
883
884
    /* Send the response */
885
0
    UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub,
886
0
                              "Sending out a publish response with %" PRIu32
887
0
                              " notifications", notifications);
888
0
    sendResponse(server, sub->session->channel, pre->requestId,
889
0
                 (UA_Response*)response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
890
891
    /* Reset the Subscription state to NORMAL. But only if all notifications
892
     * have been sent out. Otherwise keep the Subscription in the LATE state. So
893
     * we immediately answer incoming Publish requests.
894
     *
895
     * (We also check that session->responseQueueSize > 0 in Service_Publish. To
896
     * avoid answering Publish requests out of order. As we additionally may have
897
     * scheduled a publish callback as a delayed callback. */
898
0
    if(sub->notificationQueueSize == 0)
899
0
        sub->late = false;
900
901
    /* Reset the KeepAlive after publishing */
902
0
    sub->currentKeepAliveCount = 0;
903
904
    /* Free the response */
905
0
    if(retransmission) {
906
        /* NotificationMessage was moved into retransmission queue */
907
0
        UA_NotificationMessage_init(&response->notificationMessage);
908
0
    }
909
0
    response->availableSequenceNumbers = NULL;
910
0
    response->availableSequenceNumbersSize = 0;
911
0
    UA_PublishResponse_clear(&pre->response);
912
0
    UA_free(pre);
913
914
    /* Update the diagnostics statistics */
915
0
#ifdef UA_ENABLE_DIAGNOSTICS
916
0
    sub->publishRequestCount++;
917
918
0
    UA_UInt32 sentDCN = (UA_UInt32)
919
0
        (priorDataChangeNotifications - sub->dataChangeNotifications);
920
0
    UA_UInt32 sentEN = (UA_UInt32)(priorEventNotifications - sub->eventNotifications);
921
0
    sub->dataChangeNotificationsCount += sentDCN;
922
0
    sub->eventNotificationsCount += sentEN;
923
0
    sub->notificationsCount += (sentDCN + sentEN);
924
0
#endif
925
926
    /* Repeat sending notifications if there are more notifications to send. But
927
     * only call UA_MonitoredItem_sample in the regular publish callback. */
928
0
    UA_Boolean done = (sub->notificationQueueSize == 0);
929
0
    if(!done && !sub->delayedCallbackRegistered) {
930
0
        sub->delayedCallbackRegistered = true;
931
932
0
        sub->delayedMoreNotifications.callback = (UA_Callback)delayedPublishNotifications;
933
0
        sub->delayedMoreNotifications.application = server;
934
0
        sub->delayedMoreNotifications.context = sub;
935
936
0
        el = server->config.eventLoop;
937
0
        el->addDelayedCallback(el, &sub->delayedMoreNotifications);
938
0
    }
939
0
}
940
941
void
942
0
UA_Subscription_resendData(UA_Server *server, UA_Subscription *sub) {
943
0
    UA_LOCK_ASSERT(&server->serviceMutex);
944
0
    UA_assert(server);
945
0
    UA_assert(sub);
946
947
    /* Part 4, §6.7: If this Method is called, subsequent Publish responses
948
     * shall contain the current values of all data MonitoredItems in the
949
     * Subscription where the MonitoringMode is set to Reporting. If a value is
950
     * queued for a data MonitoredItem, the next value in the queue is sent in
951
     * the Publish response. If no value is queued for a data MonitoredItem, the
952
     * last value sent is repeated in the Publish response. */
953
0
    UA_MonitoredItem *mon;
954
0
    LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
955
        /* Create only DataChange notifications */
956
0
        if(mon->itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER)
957
0
            continue;
958
959
        /* Only if the mode is monitoring */
960
0
        if(mon->monitoringMode != UA_MONITORINGMODE_REPORTING)
961
0
            continue;
962
963
        /* If a value is queued for a data MonitoredItem, the next value in
964
         * the queue is sent in the Publish response. */
965
0
        if(mon->queueSize > 0)
966
0
            continue;
967
968
        /* Create a notification with the last sampled value */
969
0
        UA_MonitoredItem_createDataChangeNotification(server, mon, &mon->lastValue);
970
0
    }
971
0
}
972
973
void
974
0
UA_Session_ensurePublishQueueSpace(UA_Server* server, UA_Session* session) {
975
0
    if(server->config.maxPublishReqPerSession == 0)
976
0
        return;
977
978
0
    while(session->responseQueueSize >= server->config.maxPublishReqPerSession) {
979
        /* Dequeue a response */
980
0
        UA_PublishResponseEntry *pre = UA_Session_dequeuePublishReq(session);
981
0
        UA_assert(pre != NULL); /* There must be a pre as session->responseQueueSize > 0 */
982
983
0
        UA_LOG_DEBUG_SESSION(server->config.logging, session,
984
0
                             "Sending out a publish response triggered by too many publish requests");
985
986
        /* Send the response. This response has no related subscription id */
987
0
        UA_PublishResponse *response = &pre->response;
988
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS;
989
0
        sendResponse(server, session->channel, pre->requestId,
990
0
                     (UA_Response *)response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
991
992
        /* Free the response */
993
0
        UA_PublishResponse_clear(response);
994
0
        UA_free(pre);
995
0
    }
996
0
}
997
998
static void
999
0
sampleAndPublishCallback(UA_Server *server, UA_Subscription *sub) {
1000
0
    UA_assert(sub);
1001
1002
0
    lockServer(server);
1003
1004
0
    UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub,
1005
0
                              "Sample and Publish Callback");
1006
1007
    /* Sample the MonitoredItems with sampling interval <0 (which implies
1008
     * sampling in the same interval as the subscription) */
1009
0
    UA_MonitoredItem *mon;
1010
0
    LIST_FOREACH(mon, &sub->samplingMonitoredItems, sampling.subscriptionSampling) {
1011
0
        UA_MonitoredItem_sample(server, mon);
1012
0
    }
1013
1014
    /* Publish the queued notifications */
1015
0
    UA_Subscription_publish(server, sub);
1016
1017
0
    unlockServer(server);
1018
0
}
1019
1020
UA_StatusCode
1021
Subscription_setState(UA_Server *server, UA_Subscription *sub,
1022
502
                      UA_SubscriptionState state) {
1023
502
    if(state <= UA_SUBSCRIPTIONSTATE_REMOVING) {
1024
502
        if(sub->publishCallbackId != 0) {
1025
0
            removeCallback(server, sub->publishCallbackId);
1026
0
            sub->publishCallbackId = 0;
1027
0
#ifdef UA_ENABLE_DIAGNOSTICS
1028
0
            sub->disableCount++;
1029
0
#endif
1030
0
        }
1031
502
    } else if(sub->publishCallbackId == 0) {
1032
0
        UA_StatusCode res =
1033
0
            addRepeatedCallback(server, (UA_ServerCallback)sampleAndPublishCallback,
1034
0
                                sub, sub->publishingInterval, &sub->publishCallbackId);
1035
0
        if(res != UA_STATUSCODE_GOOD) {
1036
0
            sub->state = UA_SUBSCRIPTIONSTATE_STOPPED;
1037
0
            return res;
1038
0
        }
1039
1040
        /* Send (at least a) keepalive after the next publish interval */
1041
0
        sub->currentKeepAliveCount = sub->maxKeepAliveCount;
1042
1043
0
#ifdef UA_ENABLE_DIAGNOSTICS
1044
0
        sub->enableCount++;
1045
0
#endif
1046
0
    }
1047
1048
502
    sub->state = state;
1049
502
    return UA_STATUSCODE_GOOD;
1050
502
}
1051
1052
/****************/
1053
/* Notification */
1054
/****************/
1055
1056
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
1057
1058
/* The specification states in Part 4 5.12.1.5 that an EventQueueOverflowEvent
1059
 * "is generated when the first Event has to be discarded [...] without
1060
 * discarding any other event". So only generate one for all deleted events. */
1061
static UA_StatusCode
1062
createEventOverflowNotification(UA_Server *server, UA_Subscription *sub,
1063
0
                                UA_MonitoredItem *mon) {
1064
    /* Avoid creating two adjacent overflow events */
1065
0
    UA_Notification *indicator = NULL;
1066
0
    if(mon->parameters.discardOldest) {
1067
0
        indicator = TAILQ_FIRST(&mon->queue);
1068
0
        UA_assert(indicator); /* must exist */
1069
0
        if(indicator->isOverflowEvent)
1070
0
            return UA_STATUSCODE_GOOD;
1071
0
    } else {
1072
0
        indicator = TAILQ_LAST(&mon->queue, NotificationQueue);
1073
0
        UA_assert(indicator); /* must exist */
1074
        /* Skip the last element. It is the recently added notification that
1075
         * shall be kept. We know it is not an OverflowEvent. */
1076
0
        UA_Notification *before = TAILQ_PREV(indicator, NotificationQueue, monEntry);
1077
0
        if(before && before->isOverflowEvent)
1078
0
            return UA_STATUSCODE_GOOD;
1079
0
    }
1080
1081
    /* A Notification is inserted into the queue which includes only the
1082
     * NodeId of the OverflowEventType. */
1083
1084
    /* Get the EventFilter */
1085
0
    if(mon->parameters.filter.content.decoded.type != &UA_TYPES[UA_TYPES_EVENTFILTER])
1086
0
        return UA_STATUSCODE_BADINTERNALERROR;
1087
0
    const UA_EventFilter *ef = (const UA_EventFilter*)
1088
0
        mon->parameters.filter.content.decoded.data;
1089
0
    if(ef->selectClausesSize == 0)
1090
0
        return UA_STATUSCODE_BADINTERNALERROR;
1091
1092
    /* Initialize the notification */
1093
0
    UA_Notification *n = UA_Notification_new();
1094
0
    if(!n)
1095
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
1096
0
    n->isOverflowEvent = true;
1097
0
    n->mon = mon;
1098
0
    n->data.event.clientHandle = mon->parameters.clientHandle;
1099
1100
    /* The session is needed to evaluate the select-clause. But used only for
1101
     * limited reads on the source node. So we can use the admin-session here if
1102
     * the subscription is detached. */
1103
0
    UA_Session *session = (sub->session) ? sub->session : &server->adminSession;
1104
1105
    /* Set up the context for the filter evaluation */
1106
0
    static UA_String sourceName = UA_STRING_STATIC("Internal/EventQueueOverflow");
1107
0
    UA_KeyValuePair fields[1];
1108
0
    fields[0].key = (UA_QualifiedName){1, UA_STRING_STATIC("/SourceName")};
1109
0
    UA_Variant_setScalar(&fields[0].value, &sourceName, &UA_TYPES[UA_TYPES_STRING]);
1110
0
    UA_KeyValueMap fieldMap = {1, fields};
1111
1112
0
    UA_FilterEvalContext ctx;
1113
0
    UA_FilterEvalContext_init(&ctx);
1114
0
    ctx.server = server;
1115
0
    ctx.session = session;
1116
0
    ctx.filter = *ef;
1117
0
    ctx.ed.sourceNode = UA_NS0ID(SERVER);
1118
0
    ctx.ed.eventType = UA_NS0ID(EVENTQUEUEOVERFLOWEVENTTYPE);
1119
0
    ctx.ed.severity = 201; /* TODO: Can this be configured? */
1120
0
    ctx.ed.message = UA_LOCALIZEDTEXT(NULL, NULL);
1121
0
    ctx.ed.eventFields = &fieldMap;
1122
1123
    /* Evaluate the select clause to populate the notification */
1124
0
    UA_StatusCode res = evaluateSelectClause(&ctx, &n->data.event);
1125
0
    UA_FilterEvalContext_reset(&ctx);
1126
0
    if(res != UA_STATUSCODE_GOOD) {
1127
0
        UA_free(n);
1128
0
        return res;
1129
0
    }
1130
1131
    /* Insert before the removed notification. This is either first in the
1132
     * queue (if the oldest notification was removed) or before the new event
1133
     * that remains the last element of the queue.
1134
     *
1135
     * Ensure that the following is consistent with UA_Notification_enqueueMon
1136
     * and UA_Notification_enqueueSub! */
1137
0
    TAILQ_INSERT_BEFORE(indicator, n, monEntry);
1138
0
    ++mon->eventOverflows;
1139
0
    ++mon->queueSize;
1140
1141
    /* Test for consistency */
1142
0
    UA_assert(mon->queueSize >= mon->eventOverflows);
1143
0
    UA_assert(mon->eventOverflows <= mon->queueSize - mon->eventOverflows + 1);
1144
1145
0
    if(TAILQ_NEXT(indicator, subEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) {
1146
        /* Insert just before the indicator */
1147
0
        TAILQ_INSERT_BEFORE(indicator, n, subEntry);
1148
0
    } else {
1149
        /* The indicator was not reporting or not added yet. */
1150
0
        if(!mon->parameters.discardOldest) {
1151
            /* Add last to the per-Subscription queue */
1152
0
            TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue,
1153
0
                              n, subEntry);
1154
0
        } else {
1155
            /* Find the oldest reported element. Add before that. */
1156
0
            while(indicator) {
1157
0
                indicator = TAILQ_PREV(indicator, NotificationQueue, monEntry);
1158
0
                if(!indicator) {
1159
0
                    TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue,
1160
0
                                      n, subEntry);
1161
0
                    break;
1162
0
                }
1163
0
                if(TAILQ_NEXT(indicator, subEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) {
1164
0
                    TAILQ_INSERT_BEFORE(indicator, n, subEntry);
1165
0
                    break;
1166
0
                }
1167
0
            }
1168
0
        }
1169
0
    }
1170
1171
0
    ++sub->notificationQueueSize;
1172
0
    ++sub->eventNotifications;
1173
1174
    /* Update the diagnostics statistics */
1175
0
#ifdef UA_ENABLE_DIAGNOSTICS
1176
0
    sub->eventQueueOverFlowCount++;
1177
0
#endif
1178
1179
0
    return UA_STATUSCODE_GOOD;
1180
0
}
1181
1182
#endif
1183
1184
/* Set the InfoBits that a datachange notification was removed */
1185
static void
1186
0
setOverflowInfoBits(UA_MonitoredItem *mon) {
1187
    /* Only for queues with more than one element */
1188
0
    if(mon->parameters.queueSize == 1)
1189
0
        return;
1190
1191
0
    UA_Notification *indicator = NULL;
1192
0
    if(mon->parameters.discardOldest) {
1193
0
        indicator = TAILQ_FIRST(&mon->queue);
1194
0
    } else {
1195
0
        indicator = TAILQ_LAST(&mon->queue, NotificationQueue);
1196
0
    }
1197
0
    UA_assert(indicator); /* must exist */
1198
1199
0
    indicator->data.dataChange.value.hasStatus = true;
1200
0
    indicator->data.dataChange.value.status |=
1201
0
        (UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW);
1202
0
}
1203
1204
/* Remove the InfoBits when the queueSize was reduced to 1 */
1205
void
1206
0
UA_MonitoredItem_removeOverflowInfoBits(UA_MonitoredItem *mon) {
1207
    /* Don't consider queue size > 1 and Event MonitoredItems */
1208
0
    if(mon->parameters.queueSize > 1 ||
1209
0
       mon->itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER)
1210
0
        return;
1211
1212
    /* Get the first notification */
1213
0
    UA_Notification *n = TAILQ_FIRST(&mon->queue);
1214
0
    if(!n)
1215
0
        return;
1216
1217
    /* Assertion that at most one notification is in the queue */
1218
0
    UA_assert(n == TAILQ_LAST(&mon->queue, NotificationQueue));
1219
1220
    /* Remve the Infobits */
1221
0
    n->data.dataChange.value.status &= ~(UA_StatusCode)
1222
0
        (UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW);
1223
0
}
1224
1225
/*****************/
1226
/* MonitoredItem */
1227
/*****************/
1228
1229
void
1230
0
UA_MonitoredItem_init(UA_MonitoredItem *mon) {
1231
0
    memset(mon, 0, sizeof(UA_MonitoredItem));
1232
0
    TAILQ_INIT(&mon->queue);
1233
0
    mon->triggeredUntil = UA_INT64_MIN;
1234
0
}
1235
1236
static UA_StatusCode
1237
addMonitoredItemBackpointer(UA_Server *server, UA_Session *session,
1238
0
                            UA_Node *node, void *data) {
1239
0
    UA_MonitoredItem *mon = (UA_MonitoredItem*)data;
1240
0
    UA_assert(mon != (UA_MonitoredItem*)~0);
1241
0
    mon->sampling.nodeListNext = node->head.monitoredItems;
1242
0
    node->head.monitoredItems = mon;
1243
0
    return UA_STATUSCODE_GOOD;
1244
0
}
1245
1246
static UA_StatusCode
1247
removeMonitoredItemBackPointer(UA_Server *server, UA_Session *session,
1248
0
                               UA_Node *node, void *data) {
1249
0
    if(!node->head.monitoredItems)
1250
0
        return UA_STATUSCODE_GOOD;
1251
1252
    /* Edge case that it's the first element */
1253
0
    UA_MonitoredItem *remove = (UA_MonitoredItem*)data;
1254
0
    if(node->head.monitoredItems == remove) {
1255
0
        node->head.monitoredItems = remove->sampling.nodeListNext;
1256
0
        return UA_STATUSCODE_GOOD;
1257
0
    }
1258
1259
0
    UA_MonitoredItem *prev = node->head.monitoredItems;
1260
0
    UA_MonitoredItem *entry = prev->sampling.nodeListNext;
1261
0
    for(; entry != NULL; prev = entry, entry = entry->sampling.nodeListNext) {
1262
0
        if(entry == remove) {
1263
0
            prev->sampling.nodeListNext = entry->sampling.nodeListNext;
1264
0
            break;
1265
0
        }
1266
0
    }
1267
1268
0
    return UA_STATUSCODE_GOOD;
1269
0
}
1270
1271
void
1272
0
UA_MonitoredItem_register(UA_Server *server, UA_MonitoredItem *mon) {
1273
0
    UA_LOCK_ASSERT(&server->serviceMutex);
1274
1275
0
    if(mon->registered)
1276
0
        return;
1277
0
    mon->registered = true;
1278
1279
    /* Register in Subscription and Server */
1280
0
    UA_Subscription *sub = mon->subscription;
1281
0
    mon->monitoredItemId = ++sub->lastMonitoredItemId;
1282
0
    mon->subscription = sub;
1283
0
    LIST_INSERT_HEAD(&sub->monitoredItems, mon, listEntry);
1284
0
    sub->monitoredItemsSize++;
1285
0
    server->monitoredItemsSize++;
1286
1287
    /* Register the MonitoredItem in userland */
1288
0
    if(server->config.monitoredItemRegisterCallback) {
1289
0
        UA_Session *session = sub->session;
1290
0
        void *targetContext = NULL;
1291
0
        getNodeContext(server, mon->itemToMonitor.nodeId, &targetContext);
1292
0
        server->config.monitoredItemRegisterCallback(server,
1293
0
                                                     session ? &session->sessionId : NULL,
1294
0
                                                     session ? session->context : NULL,
1295
0
                                                     &mon->itemToMonitor.nodeId,
1296
0
                                                     targetContext,
1297
0
                                                     mon->itemToMonitor.attributeId, false);
1298
0
    }
1299
0
}
1300
1301
static void
1302
0
UA_Server_unregisterMonitoredItem(UA_Server *server, UA_MonitoredItem *mon) {
1303
0
    UA_LOCK_ASSERT(&server->serviceMutex);
1304
1305
0
    if(!mon->registered)
1306
0
        return;
1307
0
    mon->registered = false;
1308
1309
0
    UA_Subscription *sub = mon->subscription;
1310
0
    UA_LOG_INFO_SUBSCRIPTION(server->config.logging, sub,
1311
0
                             "MonitoredItem %" PRIi32 " | Deleting the MonitoredItem",
1312
0
                             mon->monitoredItemId);
1313
1314
    /* Deregister MonitoredItem in userland */
1315
0
    if(server->config.monitoredItemRegisterCallback) {
1316
0
        UA_Session *session = sub->session;
1317
0
        void *targetContext = NULL;
1318
0
        getNodeContext(server, mon->itemToMonitor.nodeId, &targetContext);
1319
0
        server->config.monitoredItemRegisterCallback(server,
1320
0
                                                     session ? &session->sessionId : NULL,
1321
0
                                                     session ? session->context : NULL,
1322
0
                                                     &mon->itemToMonitor.nodeId,
1323
0
                                                     targetContext,
1324
0
                                                     mon->itemToMonitor.attributeId, true);
1325
0
    }
1326
1327
    /* Deregister in Subscription and server */
1328
0
    sub->monitoredItemsSize--;
1329
0
    LIST_REMOVE(mon, listEntry);
1330
0
    server->monitoredItemsSize--;
1331
0
}
1332
1333
UA_StatusCode
1334
UA_MonitoredItem_setMonitoringMode(UA_Server *server, UA_MonitoredItem *mon,
1335
0
                                   UA_MonitoringMode monitoringMode) {
1336
    /* Check if the MonitoringMode is valid or not */
1337
0
    if(monitoringMode > UA_MONITORINGMODE_REPORTING)
1338
0
        return UA_STATUSCODE_BADMONITORINGMODEINVALID;
1339
1340
    /* Set the MonitoringMode, store the old mode */
1341
0
    UA_MonitoringMode oldMode = mon->monitoringMode;
1342
0
    mon->monitoringMode = monitoringMode;
1343
1344
0
    UA_Notification *notification;
1345
    /* Reporting is disabled. This causes all Notifications to be dequeued and
1346
     * deleted. Also remove the last samples so that we immediately generate a
1347
     * Notification when re-activated. */
1348
0
    if(mon->monitoringMode == UA_MONITORINGMODE_DISABLED) {
1349
0
        UA_Notification *notification_tmp;
1350
0
        UA_MonitoredItem_unregisterSampling(server, mon);
1351
0
        TAILQ_FOREACH_SAFE(notification, &mon->queue, monEntry, notification_tmp) {
1352
0
            UA_Notification_delete(notification);
1353
0
        }
1354
0
        UA_DataValue_clear(&mon->lastValue);
1355
0
        return UA_STATUSCODE_GOOD;
1356
0
    }
1357
1358
    /* When reporting is enabled, put all notifications that were already
1359
     * sampled into the global queue of the subscription. When sampling is
1360
     * enabled, remove all notifications from the global queue. !!! This needs
1361
     * to be the same operation as in UA_Notification_enqueue !!! */
1362
0
    if(mon->monitoringMode == UA_MONITORINGMODE_REPORTING) {
1363
        /* Make all notifications reporting. Re-enqueue to ensure they have the
1364
         * right order if some notifications are already reported by a trigger
1365
         * link. */
1366
0
        TAILQ_FOREACH(notification, &mon->queue, monEntry) {
1367
0
            UA_Notification_dequeueSub(notification);
1368
0
            UA_Notification_enqueueSub(notification);
1369
0
        }
1370
0
    } else /* mon->monitoringMode == UA_MONITORINGMODE_SAMPLING */ {
1371
        /* Make all notifications non-reporting */
1372
0
        TAILQ_FOREACH(notification, &mon->queue, monEntry)
1373
0
            UA_Notification_dequeueSub(notification);
1374
0
    }
1375
1376
    /* Register the sampling callback with an interval. If registering the
1377
     * sampling callback failed, set to disabled. But don't delete the current
1378
     * notifications. */
1379
0
    UA_StatusCode res = UA_MonitoredItem_registerSampling(server, mon);
1380
0
    if(res != UA_STATUSCODE_GOOD) {
1381
0
        mon->monitoringMode = UA_MONITORINGMODE_DISABLED;
1382
0
        return res;
1383
0
    }
1384
1385
    /* Manually create the first sample if the MonitoredItem was disabled, the
1386
     * MonitoredItem is now sampling (or reporting) and it is not an
1387
     * Event-MonitoredItem */
1388
0
    if(oldMode == UA_MONITORINGMODE_DISABLED &&
1389
0
       mon->monitoringMode > UA_MONITORINGMODE_DISABLED &&
1390
0
       mon->itemToMonitor.attributeId != UA_ATTRIBUTEID_EVENTNOTIFIER)
1391
0
        UA_MonitoredItem_sample(server, mon);
1392
1393
0
    return UA_STATUSCODE_GOOD;
1394
0
}
1395
1396
static void
1397
0
delayedFreeMonitoredItem(void *app, void *context) {
1398
0
    UA_free(context);
1399
0
}
1400
1401
void
1402
0
UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *mon) {
1403
0
    UA_LOCK_ASSERT(&server->serviceMutex);
1404
1405
    /* Remove the sampling callback */
1406
0
    UA_MonitoredItem_unregisterSampling(server, mon);
1407
1408
    /* Deregister in Server and Subscription */
1409
0
    if(mon->registered)
1410
0
        UA_Server_unregisterMonitoredItem(server, mon);
1411
1412
    /* Cancel outstanding async reads. The status code avoids the sample being
1413
     * processed. Call _processReady to ensure that the callbacks have been
1414
     * triggered. */
1415
0
    if(mon->outstandingAsyncReads > 0)
1416
0
        async_cancel(server, mon, UA_STATUSCODE_BADREQUESTCANCELLEDBYREQUEST, true);
1417
0
    UA_assert(mon->outstandingAsyncReads == 0);
1418
1419
    /* Remove the TriggeringLinks */
1420
0
    if(mon->triggeringLinksSize > 0) {
1421
0
        UA_free(mon->triggeringLinks);
1422
0
        mon->triggeringLinks = NULL;
1423
0
        mon->triggeringLinksSize = 0;
1424
0
    }
1425
1426
    /* Remove the queued notifications attached to the subscription */
1427
0
    UA_Notification *notification, *notification_tmp;
1428
0
    TAILQ_FOREACH_SAFE(notification, &mon->queue, monEntry, notification_tmp) {
1429
0
        UA_Notification_delete(notification);
1430
0
    }
1431
1432
    /* Remove the settings */
1433
0
    UA_ReadValueId_clear(&mon->itemToMonitor);
1434
0
    UA_MonitoringParameters_clear(&mon->parameters);
1435
1436
    /* Remove the last samples */
1437
0
    UA_DataValue_clear(&mon->lastValue);
1438
1439
    /* If this is a local MonitoredItem, clean up additional values */
1440
0
    if(mon->subscription == server->adminSubscription) {
1441
0
        UA_LocalMonitoredItem *lm = (UA_LocalMonitoredItem*)mon;
1442
0
        for(size_t i = 0; i < lm->eventFields.mapSize; i++)
1443
0
            UA_Variant_init(&lm->eventFields.map[i].value);
1444
0
        UA_KeyValueMap_clear(&lm->eventFields);
1445
0
    }
1446
1447
    /* Add a delayed callback to remove the MonitoredItem when the current jobs
1448
     * have completed. This is needed to allow that a local MonitoredItem can
1449
     * remove itself in the callback. */
1450
0
    mon->delayedFreePointers.callback = delayedFreeMonitoredItem;
1451
0
    mon->delayedFreePointers.application = NULL;
1452
0
    mon->delayedFreePointers.context = mon;
1453
0
    UA_EventLoop *el = server->config.eventLoop;
1454
0
    el->addDelayedCallback(el, &mon->delayedFreePointers);
1455
0
}
1456
1457
void
1458
0
UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
1459
    /* There can be only one EventOverflow more than normal entries. Because
1460
     * EventOverflows are never adjacent. */
1461
0
    UA_assert(mon->queueSize >= mon->eventOverflows);
1462
0
    UA_assert(mon->eventOverflows <= mon->queueSize - mon->eventOverflows + 1);
1463
1464
    /* MonitoredItems are always attached to a Subscription */
1465
0
    UA_Subscription *sub = mon->subscription;
1466
0
    UA_assert(sub);
1467
1468
    /* Nothing to do */
1469
0
    if(mon->queueSize - mon->eventOverflows <= mon->parameters.queueSize)
1470
0
        return;
1471
1472
    /* Help clang-analyzer */
1473
0
#if defined(UA_DEBUG) && defined(UA_ENABLE_SUBSCRIPTIONS_EVENTS)
1474
0
    UA_Notification *last_del = NULL;
1475
0
    (void)last_del;
1476
0
#endif
1477
1478
    /* Remove notifications until the required queue size is reached */
1479
0
    UA_Boolean reporting = false;
1480
0
    size_t remove = mon->queueSize - mon->eventOverflows - mon->parameters.queueSize;
1481
0
    while(remove > 0) {
1482
        /* The minimum queue size (without EventOverflows) is 1. At least two
1483
         * notifications that are not EventOverflows are in the queue. */
1484
0
        UA_assert(mon->queueSize - mon->eventOverflows >= 2);
1485
1486
        /* Select the next notification to delete. Skip over overflow events. */
1487
0
        UA_Notification *del = NULL;
1488
0
        if(mon->parameters.discardOldest) {
1489
            /* Remove the oldest */
1490
0
            del = TAILQ_FIRST(&mon->queue);
1491
0
#if defined(UA_ENABLE_SUBSCRIPTIONS_EVENTS)
1492
            /* Skip overflow events */
1493
0
            while(del->isOverflowEvent) {
1494
0
                del = TAILQ_NEXT(del, monEntry);
1495
0
                UA_assert(del != last_del);
1496
0
            }
1497
0
#endif
1498
0
        } else {
1499
            /* Remove the second newest (to keep the up-to-date notification).
1500
             * The last entry is not an OverflowEvent -- we just added it. */
1501
0
            del = TAILQ_LAST(&mon->queue, NotificationQueue);
1502
0
            del = TAILQ_PREV(del, NotificationQueue, monEntry);
1503
0
#if defined(UA_ENABLE_SUBSCRIPTIONS_EVENTS)
1504
            /* skip overflow events */
1505
0
            while(del->isOverflowEvent) {
1506
0
                del = TAILQ_PREV(del, NotificationQueue, monEntry);
1507
0
                UA_assert(del != last_del);
1508
0
            }
1509
0
#endif
1510
0
        }
1511
1512
0
        UA_assert(del); /* There must be one entry that can be deleted */
1513
1514
        /* Only create OverflowEvents (and set InfoBits) if the notification
1515
         * that is removed is reported */
1516
0
        if(TAILQ_NEXT(del, subEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL)
1517
0
            reporting = true;
1518
1519
        /* Move the entry after del in the per-MonitoredItem queue right after
1520
         * del in the per-Subscription queue. So we don't starve MonitoredItems
1521
         * with a high sampling interval in the Subscription queue by always
1522
         * removing their first appearance in the per-Subscription queue.
1523
         *
1524
         * With MonitoringMode == SAMPLING, the Notifications are not (all) in
1525
         * the per-Subscription queue. Don't reinsert in that case.
1526
         *
1527
         * For the reinsertion to work, first insert into the per-Subscription
1528
         * queue. */
1529
0
        if(TAILQ_NEXT(del, subEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) {
1530
0
            UA_Notification *after_del = TAILQ_NEXT(del, monEntry);
1531
0
            UA_assert(after_del); /* There must be one remaining element after del */
1532
0
            if(TAILQ_NEXT(after_del, subEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) {
1533
0
                TAILQ_REMOVE(&sub->notificationQueue, after_del, subEntry);
1534
0
                TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, subEntry);
1535
0
            }
1536
0
        }
1537
1538
0
        remove--;
1539
1540
        /* Delete the notification and remove it from the queues */
1541
0
        UA_Notification_delete(del);
1542
1543
        /* Update the subscription diagnostics statistics */
1544
0
#ifdef UA_ENABLE_DIAGNOSTICS
1545
0
        sub->monitoringQueueOverflowCount++;
1546
0
#endif
1547
1548
        /* Help scan-analyzer */
1549
0
#if defined(UA_DEBUG) && defined(UA_ENABLE_SUBSCRIPTIONS_EVENTS)
1550
0
        last_del = del;
1551
0
#endif
1552
0
        UA_assert(del != TAILQ_FIRST(&mon->queue));
1553
0
        UA_assert(del != TAILQ_LAST(&mon->queue, NotificationQueue));
1554
0
        UA_assert(del != TAILQ_PREV(TAILQ_LAST(&mon->queue, NotificationQueue),
1555
0
                                    NotificationQueue, monEntry));
1556
0
    }
1557
1558
    /* Leave an entry to indicate that notifications were removed */
1559
0
    if(reporting) {
1560
0
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
1561
0
        if(mon->itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER)
1562
0
            createEventOverflowNotification(server, sub, mon);
1563
0
        else
1564
0
#endif
1565
0
            setOverflowInfoBits(mon);
1566
0
    }
1567
0
}
1568
1569
static void
1570
0
UA_MonitoredItem_lockAndSample(UA_Server *server, UA_MonitoredItem *mon) {
1571
0
    lockServer(server);
1572
0
    UA_MonitoredItem_sample(server, mon);
1573
0
    unlockServer(server);
1574
0
}
1575
1576
UA_StatusCode
1577
0
UA_MonitoredItem_registerSampling(UA_Server *server, UA_MonitoredItem *mon) {
1578
0
    UA_LOCK_ASSERT(&server->serviceMutex);
1579
1580
    /* Sampling is already registered */
1581
0
    if(mon->samplingType != UA_MONITOREDITEMSAMPLINGTYPE_NONE)
1582
0
        return UA_STATUSCODE_GOOD;
1583
1584
    /* The subscription is attached to a session at this point */
1585
0
    UA_Subscription *sub = mon->subscription;
1586
0
    if(!sub->session)
1587
0
        return UA_STATUSCODE_BADINTERNALERROR;
1588
1589
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
1590
0
    if(mon->itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER ||
1591
0
       mon->parameters.samplingInterval == 0.0) {
1592
        /* Add to the linked list in the node */
1593
0
        res = editNode(server, sub->session, &mon->itemToMonitor.nodeId, 0,
1594
0
                       UA_REFERENCETYPESET_NONE, UA_BROWSEDIRECTION_INVALID,
1595
0
                       addMonitoredItemBackpointer, mon);
1596
0
        if(res == UA_STATUSCODE_GOOD)
1597
0
            mon->samplingType = UA_MONITOREDITEMSAMPLINGTYPE_EVENT;
1598
0
    } else if(mon->parameters.samplingInterval == sub->publishingInterval) {
1599
        /* Add to the subscription for sampling before every publish */
1600
0
        LIST_INSERT_HEAD(&sub->samplingMonitoredItems, mon,
1601
0
                         sampling.subscriptionSampling);
1602
0
        mon->samplingType = UA_MONITOREDITEMSAMPLINGTYPE_PUBLISH;
1603
0
    } else {
1604
        /* DataChange MonitoredItems with a positive sampling interval have a
1605
         * repeated callback. Other MonitoredItems are attached to the Node in a
1606
         * linked list of backpointers. */
1607
0
        res = addRepeatedCallback(server,
1608
0
                                  (UA_ServerCallback)UA_MonitoredItem_lockAndSample,
1609
0
                                  mon, mon->parameters.samplingInterval,
1610
0
                                  &mon->sampling.callbackId);
1611
0
        if(res == UA_STATUSCODE_GOOD)
1612
0
            mon->samplingType = UA_MONITOREDITEMSAMPLINGTYPE_CYCLIC;
1613
0
    }
1614
1615
0
    return res;
1616
0
}
1617
1618
void
1619
0
UA_MonitoredItem_unregisterSampling(UA_Server *server, UA_MonitoredItem *mon) {
1620
0
    UA_LOCK_ASSERT(&server->serviceMutex);
1621
1622
0
    switch(mon->samplingType) {
1623
0
    case UA_MONITOREDITEMSAMPLINGTYPE_CYCLIC:
1624
        /* Remove repeated callback */
1625
0
        removeCallback(server, mon->sampling.callbackId);
1626
0
        break;
1627
1628
0
    case UA_MONITOREDITEMSAMPLINGTYPE_EVENT: {
1629
        /* Removing is always done with the AdminSession. So it also works when
1630
         * the Subscription has been detached from its Session. */
1631
0
        editNode(server, &server->adminSession, &mon->itemToMonitor.nodeId, 0,
1632
0
                 UA_REFERENCETYPESET_NONE, UA_BROWSEDIRECTION_INVALID,
1633
0
                 removeMonitoredItemBackPointer, mon);
1634
0
        break;
1635
0
    }
1636
1637
0
    case UA_MONITOREDITEMSAMPLINGTYPE_PUBLISH:
1638
        /* Added to the subscription */
1639
0
        LIST_REMOVE(mon, sampling.subscriptionSampling);
1640
0
        break;
1641
1642
0
    case UA_MONITOREDITEMSAMPLINGTYPE_NONE:
1643
0
    default:
1644
        /* Sampling is not registered */
1645
0
        break;
1646
0
    }
1647
1648
0
    mon->samplingType = UA_MONITOREDITEMSAMPLINGTYPE_NONE;
1649
0
}
1650
1651
UA_StatusCode
1652
0
UA_MonitoredItem_removeLink(UA_Subscription *sub, UA_MonitoredItem *mon, UA_UInt32 linkId) {
1653
    /* Find the index */
1654
0
    size_t i = 0;
1655
0
    for(; i < mon->triggeringLinksSize; i++) {
1656
0
        if(mon->triggeringLinks[i] == linkId)
1657
0
            break;
1658
0
    }
1659
1660
    /* Not existing / already removed */
1661
0
    if(i == mon->triggeringLinksSize)
1662
0
        return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
1663
1664
    /* Remove the link */
1665
0
    mon->triggeringLinksSize--;
1666
0
    if(mon->triggeringLinksSize == 0) {
1667
0
        UA_free(mon->triggeringLinks);
1668
0
        mon->triggeringLinks = NULL;
1669
0
    } else {
1670
0
        mon->triggeringLinks[i] = mon->triggeringLinks[mon->triggeringLinksSize];
1671
0
        UA_UInt32 *tmpLinks = (UA_UInt32*)
1672
0
            UA_realloc(mon->triggeringLinks, mon->triggeringLinksSize * sizeof(UA_UInt32));
1673
0
        if(tmpLinks)
1674
0
            mon->triggeringLinks = tmpLinks;
1675
0
    }
1676
1677
    /* Does the target MonitoredItem exist? This is stupid, but the CTT wants us
1678
     * to to this. We don't auto-remove links together with the target
1679
     * MonitoredItem. Links to removed MonitoredItems are removed when the link
1680
     * triggers and the target no longer exists. */
1681
0
    UA_MonitoredItem *mon2 = UA_Subscription_getMonitoredItem(sub, linkId);
1682
0
    if(!mon2)
1683
0
        return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
1684
1685
0
    return UA_STATUSCODE_GOOD;
1686
0
}
1687
1688
UA_StatusCode
1689
0
UA_MonitoredItem_addLink(UA_Subscription *sub, UA_MonitoredItem *mon, UA_UInt32 linkId) {
1690
    /* Does the target MonitoredItem exist? */
1691
0
    UA_MonitoredItem *mon2 = UA_Subscription_getMonitoredItem(sub, linkId);
1692
0
    if(!mon2)
1693
0
        return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
1694
1695
    /* Does the link already exist? */
1696
0
    for(size_t i = 0 ; i < mon->triggeringLinksSize; i++) {
1697
0
        if(mon->triggeringLinks[i] == linkId)
1698
0
            return UA_STATUSCODE_GOOD;
1699
0
    }
1700
1701
    /* Allocate the memory */
1702
0
    UA_UInt32 *tmpLinkIds = (UA_UInt32*)
1703
0
        UA_realloc(mon->triggeringLinks, (mon->triggeringLinksSize + 1) * sizeof(UA_UInt32));
1704
0
    if(!tmpLinkIds)
1705
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
1706
0
    mon->triggeringLinks = tmpLinkIds;
1707
1708
    /* Add the link */
1709
0
    mon->triggeringLinks[mon->triggeringLinksSize] = linkId;
1710
0
    mon->triggeringLinksSize++;
1711
0
    return UA_STATUSCODE_GOOD;
1712
0
}
1713
1714
#endif /* UA_ENABLE_SUBSCRIPTIONS */