/src/open62541/src/server/ua_subscription.c
Line | Count | Source |
1 | | /* This Source Code Form is subject to the terms of the Mozilla Public |
2 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
3 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
4 | | * |
5 | | * Copyright 2015-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer) |
6 | | * Copyright 2015 (c) Chris Iatrou |
7 | | * Copyright 2015 (c) Joakim L. Gilje |
8 | | * Copyright 2015-2016 (c) Sten Grüner |
9 | | * Copyright 2015-2016 (c) Oleksiy Vasylyev |
10 | | * Copyright 2016-2017 (c) Florian Palm |
11 | | * Copyright 2017 (c) frax2222 |
12 | | * Copyright 2017 (c) Mattias Bornhager |
13 | | * Copyright 2017 (c) Stefan Profanter, fortiss GmbH |
14 | | * Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA |
15 | | * Copyright 2017-2018 (c) Ari Breitkreuz, fortiss GmbH |
16 | | * Copyright 2018 (c) Hilscher Gesellschaft für Systemautomation mbH (Author: Martin Lang) |
17 | | * Copyright 2018 (c) Fabian Arndt, Root-Core |
18 | | * Copyright 2019 (c) HMS Industrial Networks AB (Author: Jonas Green) |
19 | | * Copyright 2020-2021 (c) Christian von Arnim, ISW University of Stuttgart (for VDW and umati) |
20 | | */ |
21 | | |
22 | | #include "ua_server_internal.h" |
23 | | #include "ua_subscription.h" |
24 | | #include "itoa.h" |
25 | | |
26 | | #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */ |
27 | | |
28 | 0 | #define UA_MAX_RETRANSMISSIONQUEUESIZE 256 |
29 | | |
30 | | /****************/ |
31 | | /* Notification */ |
32 | | /****************/ |
33 | | |
34 | | static void UA_Notification_dequeueMon(UA_Notification *n); |
35 | | static void UA_Notification_enqueueSub(UA_Notification *n); |
36 | | static void UA_Notification_dequeueSub(UA_Notification *n); |
37 | | |
38 | | UA_Notification * |
39 | 0 | UA_Notification_new(void) { |
40 | 0 | UA_Notification *n = (UA_Notification*)UA_calloc(1, sizeof(UA_Notification)); |
41 | 0 | if(n) { |
42 | | /* Set the sentinel for a notification that is not enqueued a |
43 | | * subscription */ |
44 | 0 | TAILQ_NEXT(n, subEntry) = UA_SUBSCRIPTION_QUEUE_SENTINEL; |
45 | 0 | } |
46 | 0 | return n; |
47 | 0 | } |
48 | | |
49 | | /* Dequeue and delete the notification */ |
50 | | static void |
51 | 0 | UA_Notification_delete(UA_Notification *n) { |
52 | 0 | UA_assert(n != UA_SUBSCRIPTION_QUEUE_SENTINEL); |
53 | 0 | UA_assert(n->mon); |
54 | 0 | UA_Notification_dequeueMon(n); |
55 | 0 | UA_Notification_dequeueSub(n); |
56 | 0 | switch(n->mon->itemToMonitor.attributeId) { |
57 | 0 | #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS |
58 | 0 | case UA_ATTRIBUTEID_EVENTNOTIFIER: |
59 | 0 | UA_EventFieldList_clear(&n->data.event); |
60 | 0 | break; |
61 | 0 | #endif |
62 | 0 | default: |
63 | 0 | UA_MonitoredItemNotification_clear(&n->data.dataChange); |
64 | 0 | break; |
65 | 0 | } |
66 | 0 | UA_free(n); |
67 | 0 | } |
68 | | |
69 | | /* Add to the MonitoredItem queue, update all counters and then handle overflow */ |
70 | | static void |
71 | 0 | UA_Notification_enqueueMon(UA_Server *server, UA_Notification *n) { |
72 | 0 | UA_MonitoredItem *mon = n->mon; |
73 | 0 | UA_assert(mon); |
74 | | |
75 | | /* Add to the MonitoredItem */ |
76 | 0 | TAILQ_INSERT_TAIL(&mon->queue, n, monEntry); |
77 | 0 | ++mon->queueSize; |
78 | |
|
79 | 0 | #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS |
80 | 0 | if(n->isOverflowEvent) |
81 | 0 | ++mon->eventOverflows; |
82 | 0 | #endif |
83 | | |
84 | | /* Test for consistency */ |
85 | 0 | UA_assert(mon->queueSize >= mon->eventOverflows); |
86 | 0 | UA_assert(mon->eventOverflows <= mon->queueSize - mon->eventOverflows + 1); |
87 | | |
88 | | /* Ensure enough space is available in the MonitoredItem. Do this only after |
89 | | * adding the new Notification. */ |
90 | 0 | UA_MonitoredItem_ensureQueueSpace(server, mon); |
91 | |
|
92 | 0 | UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, mon->subscription, |
93 | 0 | "MonitoredItem %" PRIi32 " | " |
94 | 0 | "Notification enqueued (Queue size %lu / %lu)", |
95 | 0 | mon->monitoredItemId, |
96 | 0 | (long unsigned)mon->queueSize, |
97 | 0 | (long unsigned)mon->parameters.queueSize); |
98 | 0 | } |
99 | | |
100 | | static void |
101 | 0 | UA_Notification_enqueueSub(UA_Notification *n) { |
102 | 0 | UA_MonitoredItem *mon = n->mon; |
103 | 0 | UA_assert(mon); |
104 | | |
105 | 0 | UA_Subscription *sub = mon->subscription; |
106 | 0 | UA_assert(sub); |
107 | | |
108 | 0 | if(TAILQ_NEXT(n, subEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) |
109 | 0 | return; |
110 | | |
111 | | /* Add to the subscription if reporting is enabled */ |
112 | 0 | TAILQ_INSERT_TAIL(&sub->notificationQueue, n, subEntry); |
113 | 0 | ++sub->notificationQueueSize; |
114 | |
|
115 | 0 | switch(mon->itemToMonitor.attributeId) { |
116 | 0 | #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS |
117 | 0 | case UA_ATTRIBUTEID_EVENTNOTIFIER: |
118 | 0 | ++sub->eventNotifications; |
119 | 0 | break; |
120 | 0 | #endif |
121 | 0 | default: |
122 | 0 | ++sub->dataChangeNotifications; |
123 | 0 | break; |
124 | 0 | } |
125 | 0 | } |
126 | | |
127 | | void |
128 | 0 | UA_Notification_enqueueAndTrigger(UA_Server *server, UA_Notification *n) { |
129 | 0 | UA_MonitoredItem *mon = n->mon; |
130 | 0 | UA_Subscription *sub = mon->subscription; |
131 | 0 | UA_assert(sub); /* A MonitoredItem is always attached to a subscription. Can |
132 | | * be a local MonitoredItem that gets published immediately |
133 | | * with a callback. */ |
134 | | |
135 | | /* If reporting or (sampled+triggered), enqueue into the Subscription first |
136 | | * and then into the MonitoredItem. UA_MonitoredItem_ensureQueueSpace |
137 | | * (called within UA_Notification_enqueueMon) assumes the notification is |
138 | | * already in the Subscription's publishing queue. */ |
139 | 0 | UA_EventLoop *el = server->config.eventLoop; |
140 | 0 | UA_DateTime nowMonotonic = el->dateTime_nowMonotonic(el); |
141 | 0 | if(mon->monitoringMode == UA_MONITORINGMODE_REPORTING || |
142 | 0 | (mon->monitoringMode == UA_MONITORINGMODE_SAMPLING && |
143 | 0 | mon->triggeredUntil > nowMonotonic)) { |
144 | 0 | UA_Notification_enqueueSub(n); |
145 | 0 | mon->triggeredUntil = UA_INT64_MIN; |
146 | 0 | UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub, |
147 | 0 | "Notification enqueued (Queue size %lu)", |
148 | 0 | (long unsigned)sub->notificationQueueSize); |
149 | 0 | } |
150 | | |
151 | | /* Insert into the MonitoredItem. This checks the queue size and |
152 | | * handles overflow. */ |
153 | 0 | UA_Notification_enqueueMon(server, n); |
154 | |
|
155 | 0 | for(size_t i = mon->triggeringLinksSize - 1; i < mon->triggeringLinksSize; i--) { |
156 | | /* Get the triggered MonitoredItem. Remove the link if the MI doesn't exist. */ |
157 | 0 | UA_MonitoredItem *triggeredMon = |
158 | 0 | UA_Subscription_getMonitoredItem(sub, mon->triggeringLinks[i]); |
159 | 0 | if(!triggeredMon) { |
160 | 0 | UA_MonitoredItem_removeLink(sub, mon, mon->triggeringLinks[i]); |
161 | 0 | continue; |
162 | 0 | } |
163 | | |
164 | | /* Only sampling MonitoredItems receive a trigger. Reporting |
165 | | * MonitoredItems send out Notifications anyway and disabled |
166 | | * MonitoredItems don't create samples to send. */ |
167 | 0 | if(triggeredMon->monitoringMode != UA_MONITORINGMODE_SAMPLING) |
168 | 0 | continue; |
169 | | |
170 | | /* Get the latest sampled Notification from the triggered MonitoredItem. |
171 | | * Enqueue for publication. */ |
172 | 0 | UA_Notification *n2 = TAILQ_LAST(&triggeredMon->queue, NotificationQueue); |
173 | 0 | if(n2) |
174 | 0 | UA_Notification_enqueueSub(n2); |
175 | | |
176 | | /* The next Notification within the publishing interval is going to be |
177 | | * published as well. (Falsely) assume that the publishing cycle has |
178 | | * started right now, so that we don't have to loop over MonitoredItems |
179 | | * to deactivate the triggering after the publishing cycle. */ |
180 | 0 | triggeredMon->triggeredUntil = nowMonotonic + |
181 | 0 | (UA_DateTime)(sub->publishingInterval * (UA_Double)UA_DATETIME_MSEC); |
182 | |
|
183 | 0 | UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub, |
184 | 0 | "MonitoredItem %u triggers MonitoredItem %u", |
185 | 0 | mon->monitoredItemId, triggeredMon->monitoredItemId); |
186 | 0 | } |
187 | | |
188 | | /* If we just enqueued a notification into the local adminSubscription, then |
189 | | * register a delayed callback for "local publishing". */ |
190 | 0 | if(sub == server->adminSubscription && !sub->delayedCallbackRegistered) { |
191 | 0 | sub->delayedCallbackRegistered = true; |
192 | 0 | sub->delayedMoreNotifications.callback = |
193 | 0 | (UA_Callback)UA_Subscription_localPublish; |
194 | 0 | sub->delayedMoreNotifications.application = server; |
195 | 0 | sub->delayedMoreNotifications.context = sub; |
196 | |
|
197 | 0 | el = server->config.eventLoop; |
198 | 0 | el->addDelayedCallback(el, &sub->delayedMoreNotifications); |
199 | 0 | } |
200 | 0 | } |
201 | | |
202 | | /* Remove from the MonitoredItem queue. This only happens if the Notification is |
203 | | * deleted right after. */ |
204 | | static void |
205 | 0 | UA_Notification_dequeueMon(UA_Notification *n) { |
206 | 0 | UA_MonitoredItem *mon = n->mon; |
207 | 0 | UA_assert(mon); |
208 | | |
209 | | /* Remove from the MonitoredItem queue */ |
210 | 0 | #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS |
211 | 0 | if(n->isOverflowEvent) |
212 | 0 | --mon->eventOverflows; |
213 | 0 | #endif |
214 | |
|
215 | 0 | TAILQ_REMOVE(&mon->queue, n, monEntry); |
216 | 0 | --mon->queueSize; |
217 | | |
218 | | /* Test for consistency */ |
219 | 0 | UA_assert(mon->queueSize >= mon->eventOverflows); |
220 | 0 | UA_assert(mon->eventOverflows <= mon->queueSize - mon->eventOverflows + 1); |
221 | 0 | } |
222 | | |
223 | | void |
224 | 0 | UA_Notification_dequeueSub(UA_Notification *n) { |
225 | 0 | if(TAILQ_NEXT(n, subEntry) == UA_SUBSCRIPTION_QUEUE_SENTINEL) |
226 | 0 | return; |
227 | | |
228 | 0 | UA_MonitoredItem *mon = n->mon; |
229 | 0 | UA_assert(mon); |
230 | 0 | UA_Subscription *sub = mon->subscription; |
231 | 0 | UA_assert(sub); |
232 | | |
233 | 0 | switch(mon->itemToMonitor.attributeId) { |
234 | 0 | #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS |
235 | 0 | case UA_ATTRIBUTEID_EVENTNOTIFIER: |
236 | 0 | --sub->eventNotifications; |
237 | 0 | break; |
238 | 0 | #endif |
239 | 0 | default: |
240 | 0 | --sub->dataChangeNotifications; |
241 | 0 | break; |
242 | 0 | } |
243 | | |
244 | 0 | TAILQ_REMOVE(&sub->notificationQueue, n, subEntry); |
245 | 0 | --sub->notificationQueueSize; |
246 | | |
247 | | /* Reset the sentinel */ |
248 | 0 | TAILQ_NEXT(n, subEntry) = UA_SUBSCRIPTION_QUEUE_SENTINEL; |
249 | 0 | } |
250 | | |
251 | | /****************/ |
252 | | /* Subscription */ |
253 | | /****************/ |
254 | | |
255 | | UA_Subscription * |
256 | 502 | UA_Subscription_new(void) { |
257 | | /* Allocate the memory */ |
258 | 502 | UA_Subscription *newSub = (UA_Subscription*)UA_calloc(1, sizeof(UA_Subscription)); |
259 | 502 | if(!newSub) |
260 | 0 | return NULL; |
261 | | |
262 | | /* The first publish response is sent immediately */ |
263 | 502 | newSub->state = UA_SUBSCRIPTIONSTATE_STOPPED; |
264 | | |
265 | | /* Even if the first publish response is a keepalive the sequence number is 1. |
266 | | * This can happen by a subscription without a monitored item (see CTT test scripts). */ |
267 | 502 | newSub->nextSequenceNumber = 1; |
268 | | |
269 | 502 | TAILQ_INIT(&newSub->retransmissionQueue); |
270 | 502 | TAILQ_INIT(&newSub->notificationQueue); |
271 | 502 | return newSub; |
272 | 502 | } |
273 | | |
274 | | static void |
275 | 502 | delayedFreeSubscription(void *app, void *context) { |
276 | 502 | UA_free(context); |
277 | 502 | } |
278 | | |
279 | | void |
280 | 502 | UA_Subscription_delete(UA_Server *server, UA_Subscription *sub) { |
281 | 502 | UA_LOCK_ASSERT(&server->serviceMutex); |
282 | | |
283 | 502 | UA_EventLoop *el = server->config.eventLoop; |
284 | | |
285 | | /* Unregister the publish callback and possible delayed callback */ |
286 | 502 | Subscription_setState(server, sub, UA_SUBSCRIPTIONSTATE_REMOVING); |
287 | | |
288 | | /* Remove delayed callbacks for processing remaining notifications */ |
289 | 502 | if(sub->delayedCallbackRegistered) { |
290 | 0 | el->removeDelayedCallback(el, &sub->delayedMoreNotifications); |
291 | 0 | sub->delayedCallbackRegistered = false; |
292 | 0 | } |
293 | | |
294 | | /* Remove the diagnostics object for the subscription */ |
295 | 502 | #ifdef UA_ENABLE_DIAGNOSTICS |
296 | 502 | if(!UA_NodeId_isNull(&sub->ns0Id)) |
297 | 0 | deleteNode(server, sub->ns0Id, true); |
298 | 502 | UA_NodeId_clear(&sub->ns0Id); |
299 | 502 | #endif |
300 | | |
301 | 502 | UA_LOG_INFO_SUBSCRIPTION(server->config.logging, sub, "Subscription deleted"); |
302 | | |
303 | | /* Detach from the session if necessary */ |
304 | 502 | if(sub->session) |
305 | 502 | UA_Session_detachSubscription(server, sub->session, sub, true); |
306 | | |
307 | | /* Remove from the server if not previously registered */ |
308 | 502 | if(sub->serverListEntry.le_prev) { |
309 | 0 | LIST_REMOVE(sub, serverListEntry); |
310 | 0 | UA_assert(server->subscriptionsSize > 0); |
311 | 0 | server->subscriptionsSize--; |
312 | 0 | server->serverDiagnosticsSummary.currentSubscriptionCount--; |
313 | 0 | } |
314 | | |
315 | | /* Delete monitored Items */ |
316 | 502 | UA_assert(server->monitoredItemsSize >= sub->monitoredItemsSize); |
317 | 502 | UA_MonitoredItem *mon, *tmp_mon; |
318 | 502 | LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, tmp_mon) { |
319 | 0 | UA_MonitoredItem_delete(server, mon); |
320 | 0 | } |
321 | 502 | UA_assert(sub->monitoredItemsSize == 0); |
322 | | |
323 | | /* Delete Retransmission Queue */ |
324 | 502 | UA_NotificationMessageEntry *nme, *nme_tmp; |
325 | 502 | TAILQ_FOREACH_SAFE(nme, &sub->retransmissionQueue, listEntry, nme_tmp) { |
326 | 0 | TAILQ_REMOVE(&sub->retransmissionQueue, nme, listEntry); |
327 | 0 | UA_NotificationMessage_clear(&nme->message); |
328 | 0 | UA_free(nme); |
329 | 0 | if(sub->session) |
330 | 0 | --sub->session->totalRetransmissionQueueSize; |
331 | 0 | --sub->retransmissionQueueSize; |
332 | 0 | } |
333 | 502 | UA_assert(sub->retransmissionQueueSize == 0); |
334 | | |
335 | | /* Pointers to the subscription may still exist upwards in the call stack. |
336 | | * Add a delayed callback to remove the Subscription when the current jobs |
337 | | * have completed. */ |
338 | 502 | sub->delayedFreePointers.callback = delayedFreeSubscription; |
339 | 502 | sub->delayedFreePointers.application = NULL; |
340 | 502 | sub->delayedFreePointers.context = sub; |
341 | 502 | el->addDelayedCallback(el, &sub->delayedFreePointers); |
342 | 502 | } |
343 | | |
344 | | void |
345 | 0 | Subscription_resetLifetime(UA_Subscription *sub) { |
346 | 0 | sub->currentLifetimeCount = 0; |
347 | 0 | } |
348 | | |
349 | | UA_MonitoredItem * |
350 | 0 | UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemId) { |
351 | 0 | UA_MonitoredItem *mon; |
352 | 0 | LIST_FOREACH(mon, &sub->monitoredItems, listEntry) { |
353 | 0 | if(mon->monitoredItemId == monitoredItemId) |
354 | 0 | break; |
355 | 0 | } |
356 | 0 | return mon; |
357 | 0 | } |
358 | | |
359 | | static void |
360 | 0 | removeOldestRetransmissionMessageFromSub(UA_Subscription *sub) { |
361 | 0 | UA_NotificationMessageEntry *oldestEntry = |
362 | 0 | TAILQ_LAST(&sub->retransmissionQueue, NotificationMessageQueue); |
363 | 0 | TAILQ_REMOVE(&sub->retransmissionQueue, oldestEntry, listEntry); |
364 | 0 | UA_NotificationMessage_clear(&oldestEntry->message); |
365 | 0 | UA_free(oldestEntry); |
366 | 0 | --sub->retransmissionQueueSize; |
367 | 0 | if(sub->session) |
368 | 0 | --sub->session->totalRetransmissionQueueSize; |
369 | |
|
370 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
371 | 0 | sub->discardedMessageCount++; |
372 | 0 | #endif |
373 | 0 | } |
374 | | |
375 | | static void |
376 | 0 | removeOldestRetransmissionMessageFromSession(UA_Session *session) { |
377 | 0 | UA_NotificationMessageEntry *oldestEntry = NULL; |
378 | 0 | UA_Subscription *oldestSub = NULL; |
379 | 0 | UA_Subscription *sub; |
380 | 0 | TAILQ_FOREACH(sub, &session->subscriptions, sessionListEntry) { |
381 | 0 | UA_NotificationMessageEntry *first = |
382 | 0 | TAILQ_LAST(&sub->retransmissionQueue, NotificationMessageQueue); |
383 | 0 | if(!first) |
384 | 0 | continue; |
385 | 0 | if(!oldestEntry || oldestEntry->message.publishTime > first->message.publishTime) { |
386 | 0 | oldestEntry = first; |
387 | 0 | oldestSub = sub; |
388 | 0 | } |
389 | 0 | } |
390 | 0 | UA_assert(oldestEntry); |
391 | 0 | UA_assert(oldestSub); |
392 | | |
393 | 0 | removeOldestRetransmissionMessageFromSub(oldestSub); |
394 | 0 | } |
395 | | |
396 | | static void |
397 | | UA_Subscription_addRetransmissionMessage(UA_Server *server, UA_Subscription *sub, |
398 | 0 | UA_NotificationMessageEntry *entry) { |
399 | | /* Release the oldest entry if there is not enough space */ |
400 | 0 | UA_Session *session = sub->session; |
401 | 0 | if(sub->retransmissionQueueSize >= UA_MAX_RETRANSMISSIONQUEUESIZE) { |
402 | 0 | UA_LOG_WARNING_SUBSCRIPTION(server->config.logging, sub, |
403 | 0 | "Subscription retransmission queue overflow"); |
404 | 0 | removeOldestRetransmissionMessageFromSub(sub); |
405 | 0 | } else if(session && server->config.maxRetransmissionQueueSize > 0 && |
406 | 0 | session->totalRetransmissionQueueSize >= |
407 | 0 | server->config.maxRetransmissionQueueSize) { |
408 | 0 | UA_LOG_WARNING_SUBSCRIPTION(server->config.logging, sub, |
409 | 0 | "Session-wide retransmission queue overflow"); |
410 | 0 | removeOldestRetransmissionMessageFromSession(sub->session); |
411 | 0 | } |
412 | | |
413 | | /* Add entry */ |
414 | 0 | TAILQ_INSERT_TAIL(&sub->retransmissionQueue, entry, listEntry); |
415 | 0 | ++sub->retransmissionQueueSize; |
416 | 0 | if(session) |
417 | 0 | ++session->totalRetransmissionQueueSize; |
418 | 0 | } |
419 | | |
420 | | UA_StatusCode |
421 | 0 | UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequenceNumber) { |
422 | | /* Find the retransmission message */ |
423 | 0 | UA_NotificationMessageEntry *entry; |
424 | 0 | TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) { |
425 | 0 | if(entry->message.sequenceNumber == sequenceNumber) |
426 | 0 | break; |
427 | 0 | } |
428 | 0 | if(!entry) |
429 | 0 | return UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN; |
430 | | |
431 | | /* Remove the retransmission message */ |
432 | 0 | TAILQ_REMOVE(&sub->retransmissionQueue, entry, listEntry); |
433 | 0 | --sub->retransmissionQueueSize; |
434 | 0 | UA_NotificationMessage_clear(&entry->message); |
435 | 0 | UA_free(entry); |
436 | |
|
437 | 0 | if(sub->session) |
438 | 0 | --sub->session->totalRetransmissionQueueSize; |
439 | |
|
440 | 0 | return UA_STATUSCODE_GOOD; |
441 | 0 | } |
442 | | |
443 | | /* The output counters are only set when the preparation is successful */ |
444 | | static UA_StatusCode |
445 | | prepareNotificationMessage(UA_Server *server, UA_Subscription *sub, |
446 | | UA_NotificationMessage *message, |
447 | 0 | size_t maxNotifications) { |
448 | 0 | UA_assert(maxNotifications > 0); |
449 | | |
450 | | /* Allocate an ExtensionObject for Event- and DataChange-Notifications. Also |
451 | | * there can be StatusChange-Notifications. The standard says in Part 4, |
452 | | * 7.2.1: |
453 | | * |
454 | | * If a Subscription contains MonitoredItems for events and data, this array |
455 | | * should have not more than 2 elements. */ |
456 | 0 | message->notificationData = (UA_ExtensionObject*) |
457 | 0 | UA_Array_new(2, &UA_TYPES[UA_TYPES_EXTENSIONOBJECT]); |
458 | 0 | if(!message->notificationData) |
459 | 0 | return UA_STATUSCODE_BADOUTOFMEMORY; |
460 | 0 | message->notificationDataSize = 2; |
461 | | |
462 | | /* Pre-allocate DataChangeNotifications */ |
463 | 0 | size_t notificationDataIdx = 0; |
464 | 0 | size_t dcnPos = 0; /* How many DataChangeNotifications? */ |
465 | 0 | UA_DataChangeNotification *dcn = NULL; |
466 | 0 | if(sub->dataChangeNotifications > 0) { |
467 | 0 | dcn = UA_DataChangeNotification_new(); |
468 | 0 | if(!dcn) { |
469 | 0 | UA_NotificationMessage_clear(message); |
470 | 0 | return UA_STATUSCODE_BADOUTOFMEMORY; |
471 | 0 | } |
472 | 0 | UA_ExtensionObject_setValue(message->notificationData, dcn, |
473 | 0 | &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]); |
474 | 0 | size_t dcnSize = sub->dataChangeNotifications; |
475 | 0 | if(dcnSize > maxNotifications) |
476 | 0 | dcnSize = maxNotifications; |
477 | 0 | dcn->monitoredItems = (UA_MonitoredItemNotification*) |
478 | 0 | UA_Array_new(dcnSize, &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]); |
479 | 0 | if(!dcn->monitoredItems) { |
480 | 0 | UA_NotificationMessage_clear(message); /* Also frees the dcn */ |
481 | 0 | return UA_STATUSCODE_BADOUTOFMEMORY; |
482 | 0 | } |
483 | 0 | dcn->monitoredItemsSize = dcnSize; |
484 | 0 | notificationDataIdx++; |
485 | 0 | } |
486 | | |
487 | 0 | #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS |
488 | 0 | size_t enlPos = 0; /* How many EventNotifications? */ |
489 | 0 | UA_EventNotificationList *enl = NULL; |
490 | 0 | if(sub->eventNotifications > 0) { |
491 | 0 | enl = UA_EventNotificationList_new(); |
492 | 0 | if(!enl) { |
493 | 0 | UA_NotificationMessage_clear(message); |
494 | 0 | return UA_STATUSCODE_BADOUTOFMEMORY; |
495 | 0 | } |
496 | 0 | UA_ExtensionObject_setValue(&message->notificationData[notificationDataIdx], |
497 | 0 | enl, &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]); |
498 | 0 | size_t enlSize = sub->eventNotifications; |
499 | 0 | if(enlSize > maxNotifications) |
500 | 0 | enlSize = maxNotifications; |
501 | 0 | enl->events = (UA_EventFieldList*) |
502 | 0 | UA_Array_new(enlSize, &UA_TYPES[UA_TYPES_EVENTFIELDLIST]); |
503 | 0 | if(!enl->events) { |
504 | 0 | UA_NotificationMessage_clear(message); |
505 | 0 | return UA_STATUSCODE_BADOUTOFMEMORY; |
506 | 0 | } |
507 | 0 | enl->eventsSize = enlSize; |
508 | 0 | notificationDataIdx++; |
509 | 0 | } |
510 | 0 | #endif |
511 | | |
512 | 0 | UA_assert(notificationDataIdx > 0); |
513 | 0 | message->notificationDataSize = notificationDataIdx; |
514 | | |
515 | | /* <-- The point of no return --> */ |
516 | | |
517 | | /* How many notifications were moved to the response overall? */ |
518 | 0 | size_t totalNotifications = 0; |
519 | 0 | UA_Notification *n, *n_tmp; |
520 | 0 | TAILQ_FOREACH_SAFE(n, &sub->notificationQueue, subEntry, n_tmp) { |
521 | 0 | if(totalNotifications >= maxNotifications) |
522 | 0 | break; |
523 | | |
524 | | /* Move the content to the response */ |
525 | 0 | switch(n->mon->itemToMonitor.attributeId) { |
526 | 0 | #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS |
527 | 0 | case UA_ATTRIBUTEID_EVENTNOTIFIER: |
528 | 0 | UA_assert(enl != NULL); /* Have at least one event notification */ |
529 | 0 | enl->events[enlPos] = n->data.event; |
530 | 0 | UA_EventFieldList_init(&n->data.event); |
531 | 0 | enlPos++; |
532 | 0 | break; |
533 | 0 | #endif |
534 | 0 | default: |
535 | 0 | UA_assert(dcn != NULL); /* Have at least one change notification */ |
536 | 0 | dcn->monitoredItems[dcnPos] = n->data.dataChange; |
537 | 0 | UA_DataValue_init(&n->data.dataChange.value); |
538 | 0 | dcnPos++; |
539 | 0 | break; |
540 | 0 | } |
541 | | |
542 | | /* If there are Notifications *before this one* in the MonitoredItem- |
543 | | * local queue, remove all of them. These are earlier Notifications that |
544 | | * are non-reporting. And we don't want them to show up after the |
545 | | * current Notification has been sent out. */ |
546 | 0 | UA_Notification *prev; |
547 | 0 | while((prev = TAILQ_PREV(n, NotificationQueue, monEntry))) { |
548 | 0 | UA_Notification_delete(prev); |
549 | | |
550 | | /* Help the Clang scan-analyzer */ |
551 | 0 | UA_assert(prev != TAILQ_PREV(n, NotificationQueue, monEntry)); |
552 | 0 | } |
553 | | |
554 | | /* Delete the notification, remove from the queues and decrease the counters */ |
555 | 0 | UA_Notification_delete(n); |
556 | |
|
557 | 0 | totalNotifications++; |
558 | 0 | } |
559 | | |
560 | | /* Set sizes */ |
561 | 0 | if(dcn) { |
562 | 0 | dcn->monitoredItemsSize = dcnPos; |
563 | 0 | if(dcnPos == 0) { |
564 | 0 | UA_free(dcn->monitoredItems); |
565 | 0 | dcn->monitoredItems = NULL; |
566 | 0 | } |
567 | 0 | } |
568 | |
|
569 | 0 | #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS |
570 | 0 | if(enl) { |
571 | 0 | enl->eventsSize = enlPos; |
572 | 0 | if(enlPos == 0) { |
573 | 0 | UA_free(enl->events); |
574 | 0 | enl->events = NULL; |
575 | 0 | } |
576 | 0 | } |
577 | 0 | #endif |
578 | |
|
579 | 0 | return UA_STATUSCODE_GOOD; |
580 | 0 | } |
581 | | |
582 | | /* According to OPC Unified Architecture, Part 4 5.13.1.1 i) The value 0 is |
583 | | * never used for the sequence number */ |
584 | | static UA_UInt32 |
585 | 0 | UA_Subscription_nextSequenceNumber(UA_UInt32 sequenceNumber) { |
586 | 0 | UA_UInt32 nextSequenceNumber = sequenceNumber + 1; |
587 | 0 | if(nextSequenceNumber == 0) |
588 | 0 | nextSequenceNumber = 1; |
589 | 0 | return nextSequenceNumber; |
590 | 0 | } |
591 | | |
592 | | static void |
593 | | sendStatusChangeDelete(UA_Server *server, UA_Subscription *sub, |
594 | 0 | UA_PublishResponseEntry *pre) { |
595 | | /* Cannot send out the StatusChange because no response is queued. Delete |
596 | | * the Subscription without sending the StatusChange. */ |
597 | 0 | if(!pre) { |
598 | 0 | UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub, |
599 | 0 | "Cannot send the StatusChange notification because " |
600 | 0 | "no response is queued."); |
601 | 0 | if(UA_StatusCode_isBad(sub->statusChange)) { |
602 | 0 | UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub, |
603 | 0 | "Removing the subscription."); |
604 | 0 | UA_Subscription_delete(server, sub); |
605 | 0 | } |
606 | 0 | return; |
607 | 0 | } |
608 | | |
609 | 0 | UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub, |
610 | 0 | "Sending out a StatusChange notification and " |
611 | 0 | "removing the subscription"); |
612 | |
|
613 | 0 | UA_EventLoop *el = server->config.eventLoop; |
614 | | |
615 | | /* Populate the response */ |
616 | 0 | UA_PublishResponse *response = &pre->response; |
617 | |
|
618 | 0 | UA_StatusChangeNotification scn; |
619 | 0 | UA_StatusChangeNotification_init(&scn); |
620 | 0 | scn.status = sub->statusChange; |
621 | |
|
622 | 0 | UA_ExtensionObject notificationData; |
623 | 0 | UA_ExtensionObject_setValue(¬ificationData, &scn, |
624 | 0 | &UA_TYPES[UA_TYPES_STATUSCHANGENOTIFICATION]); |
625 | |
|
626 | 0 | response->notificationMessage.notificationData = ¬ificationData; |
627 | 0 | response->notificationMessage.notificationDataSize = 1; |
628 | 0 | response->subscriptionId = sub->subscriptionId; |
629 | 0 | response->notificationMessage.publishTime = el->dateTime_now(el); |
630 | 0 | response->notificationMessage.sequenceNumber = sub->nextSequenceNumber; |
631 | | |
632 | | /* Send the response */ |
633 | 0 | UA_assert(sub->session); /* Otherwise pre is NULL */ |
634 | 0 | UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub, |
635 | 0 | "Sending out a publish response"); |
636 | 0 | sendResponse(server, sub->session->channel, pre->requestId, |
637 | 0 | (UA_Response *)response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]); |
638 | | |
639 | | /* Clean up */ |
640 | 0 | response->notificationMessage.notificationData = NULL; |
641 | 0 | response->notificationMessage.notificationDataSize = 0; |
642 | 0 | UA_PublishResponse_clear(&pre->response); |
643 | 0 | UA_free(pre); |
644 | | |
645 | | /* Delete the subscription */ |
646 | 0 | UA_Subscription_delete(server, sub); |
647 | 0 | } |
648 | | |
649 | | /* The local adminSubscription forwards notifications to a registered callback |
650 | | * method. This is done async from a delayed callback registered in the |
651 | | * EventLoop. */ |
652 | | void |
653 | 0 | UA_Subscription_localPublish(UA_Server *server, UA_Subscription *sub) { |
654 | 0 | lockServer(server); |
655 | 0 | sub->delayedCallbackRegistered = false; |
656 | |
|
657 | 0 | UA_Notification *n, *n_tmp; |
658 | 0 | TAILQ_FOREACH_SAFE(n, &sub->notificationQueue, subEntry, n_tmp) { |
659 | 0 | UA_MonitoredItem *mon = n->mon; |
660 | 0 | UA_LocalMonitoredItem *localMon = (UA_LocalMonitoredItem*)mon; |
661 | | |
662 | | /* Move the content to the response */ |
663 | 0 | void *nodeContext = NULL; |
664 | 0 | switch(mon->itemToMonitor.attributeId) { |
665 | 0 | #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS |
666 | 0 | case UA_ATTRIBUTEID_EVENTNOTIFIER: |
667 | | /* Set the fields in the key-value map */ |
668 | 0 | UA_assert(n->data.event.eventFieldsSize == localMon->eventFields.mapSize); |
669 | 0 | for(size_t i = 0; i < localMon->eventFields.mapSize; i++) { |
670 | 0 | localMon->eventFields.map[i].value = n->data.event.eventFields[i]; |
671 | 0 | } |
672 | | |
673 | | /* Call the callback */ |
674 | 0 | localMon->callback. |
675 | 0 | eventCallback(server, mon->monitoredItemId, localMon->context, |
676 | 0 | localMon->eventFields); |
677 | 0 | break; |
678 | 0 | #endif |
679 | 0 | default: |
680 | 0 | getNodeContext(server, mon->itemToMonitor.nodeId, &nodeContext); |
681 | 0 | localMon->callback. |
682 | 0 | dataChangeCallback(server, mon->monitoredItemId, localMon->context, |
683 | 0 | &mon->itemToMonitor.nodeId, nodeContext, |
684 | 0 | mon->itemToMonitor.attributeId, |
685 | 0 | &n->data.dataChange.value); |
686 | 0 | break; |
687 | 0 | } |
688 | | |
689 | | /* If there are Notifications *before this one* in the MonitoredItem- |
690 | | * local queue, remove all of them. These are earlier Notifications that |
691 | | * are non-reporting. And we don't want them to show up after the |
692 | | * current Notification has been sent out. */ |
693 | 0 | UA_Notification *prev; |
694 | 0 | while((prev = TAILQ_PREV(n, NotificationQueue, monEntry))) { |
695 | 0 | UA_Notification_delete(prev); |
696 | | |
697 | | /* Help the Clang scan-analyzer */ |
698 | 0 | UA_assert(prev != TAILQ_PREV(n, NotificationQueue, monEntry)); |
699 | 0 | } |
700 | | |
701 | | /* Delete the notification, remove from the queues and decrease the counters */ |
702 | 0 | UA_Notification_delete(n); |
703 | 0 | } |
704 | | |
705 | 0 | unlockServer(server); |
706 | 0 | } |
707 | | |
708 | | static void |
709 | 0 | delayedPublishNotifications(UA_Server *server, UA_Subscription *sub) { |
710 | 0 | lockServer(server); |
711 | 0 | sub->delayedCallbackRegistered = false; |
712 | 0 | UA_Subscription_publish(server, sub); |
713 | 0 | unlockServer(server); |
714 | 0 | } |
715 | | |
716 | | /* Try to publish now. Enqueue a "next publish" as a delayed callback if not |
717 | | * done. */ |
718 | | void |
719 | 0 | UA_Subscription_publish(UA_Server *server, UA_Subscription *sub) { |
720 | 0 | UA_EventLoop *el = server->config.eventLoop; |
721 | | |
722 | | /* Get a response */ |
723 | 0 | UA_PublishResponseEntry *pre = NULL; |
724 | 0 | if(sub->session) { |
725 | 0 | UA_DateTime nowMonotonic = el->dateTime_nowMonotonic(el); |
726 | 0 | do { |
727 | | /* Dequeue the oldest response */ |
728 | 0 | pre = UA_Session_dequeuePublishReq(sub->session); |
729 | 0 | if(!pre) |
730 | 0 | break; |
731 | | |
732 | | /* Check if the TimeoutHint is still valid. Otherwise return with a bad |
733 | | * statuscode and continue. */ |
734 | 0 | if(pre->maxTime < nowMonotonic) { |
735 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, sub->session, |
736 | 0 | "Publish request %u has timed out", pre->requestId); |
737 | 0 | pre->response.responseHeader.serviceResult = UA_STATUSCODE_BADTIMEOUT; |
738 | 0 | sendResponse(server, sub->session->channel, pre->requestId, |
739 | 0 | (UA_Response *)&pre->response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]); |
740 | 0 | UA_PublishResponse_clear(&pre->response); |
741 | 0 | UA_free(pre); |
742 | 0 | pre = NULL; |
743 | 0 | } |
744 | 0 | } while(!pre); |
745 | 0 | } |
746 | | |
747 | | /* Update the LifetimeCounter */ |
748 | 0 | if(pre) { |
749 | 0 | Subscription_resetLifetime(sub); |
750 | 0 | } else { |
751 | 0 | UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub, |
752 | 0 | "The publish queue is empty"); |
753 | 0 | ++sub->currentLifetimeCount; |
754 | 0 | if(sub->currentLifetimeCount > sub->lifeTimeCount) { |
755 | 0 | UA_LOG_WARNING_SUBSCRIPTION(server->config.logging, sub, |
756 | 0 | "End of subscription lifetime"); |
757 | | /* Set the StatusChange to delete the subscription. */ |
758 | 0 | sub->statusChange = UA_STATUSCODE_BADTIMEOUT; |
759 | 0 | } |
760 | 0 | } |
761 | | |
762 | | /* Send a StatusChange notification if possible and delete the |
763 | | * Subscription */ |
764 | 0 | if(sub->statusChange != UA_STATUSCODE_GOOD) { |
765 | 0 | sendStatusChangeDelete(server, sub, pre); |
766 | 0 | return; |
767 | 0 | } |
768 | | |
769 | | /* Dsiabled subscriptions do not send notifications */ |
770 | 0 | UA_UInt32 notifications = (sub->state == UA_SUBSCRIPTIONSTATE_ENABLED) ? |
771 | 0 | sub->notificationQueueSize : 0; |
772 | | |
773 | | /* Limit the number of notifications to the configured maximum */ |
774 | 0 | if(notifications > sub->notificationsPerPublish) |
775 | 0 | notifications = sub->notificationsPerPublish; |
776 | | |
777 | | /* Return if no notifications and no keepalive */ |
778 | 0 | if(notifications == 0) { |
779 | 0 | ++sub->currentKeepAliveCount; |
780 | 0 | if(sub->currentKeepAliveCount < sub->maxKeepAliveCount) { |
781 | 0 | if(pre) |
782 | 0 | UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */ |
783 | 0 | return; |
784 | 0 | } |
785 | 0 | UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub, "Sending a KeepAlive"); |
786 | 0 | } |
787 | | |
788 | | /* We want to send a response, but cannot. Either because there is no queued |
789 | | * response or because the Subscription is detached from a Session or because |
790 | | * the SecureChannel for the Session is closed. */ |
791 | 0 | if(!pre || !sub->session || !sub->session->channel) { |
792 | 0 | UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub, |
793 | 0 | "Want to send a publish response but cannot. " |
794 | 0 | "The subscription is late."); |
795 | 0 | sub->late = true; |
796 | 0 | if(pre) |
797 | 0 | UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */ |
798 | 0 | return; |
799 | 0 | } |
800 | | |
801 | 0 | UA_assert(pre); |
802 | 0 | UA_assert(sub->session); /* Otherwise pre is NULL */ |
803 | | |
804 | | /* Prepare the response */ |
805 | 0 | UA_PublishResponse *response = &pre->response; |
806 | 0 | UA_NotificationMessage *message = &response->notificationMessage; |
807 | 0 | UA_NotificationMessageEntry *retransmission = NULL; |
808 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
809 | 0 | size_t priorDataChangeNotifications = sub->dataChangeNotifications; |
810 | 0 | size_t priorEventNotifications = sub->eventNotifications; |
811 | 0 | #endif |
812 | 0 | if(notifications > 0) { |
813 | 0 | if(server->config.enableRetransmissionQueue) { |
814 | | /* Allocate the retransmission entry */ |
815 | 0 | retransmission = (UA_NotificationMessageEntry*) |
816 | 0 | UA_malloc(sizeof(UA_NotificationMessageEntry)); |
817 | 0 | if(!retransmission) { |
818 | 0 | UA_LOG_WARNING_SUBSCRIPTION(server->config.logging, sub, |
819 | 0 | "Could not allocate memory for retransmission. " |
820 | 0 | "The subscription is late."); |
821 | 0 | sub->late = true; |
822 | 0 | UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */ |
823 | 0 | return; |
824 | 0 | } |
825 | 0 | } |
826 | | |
827 | | /* Prepare the response */ |
828 | 0 | UA_StatusCode retval = |
829 | 0 | prepareNotificationMessage(server, sub, message, notifications); |
830 | 0 | if(retval != UA_STATUSCODE_GOOD) { |
831 | 0 | UA_LOG_WARNING_SUBSCRIPTION(server->config.logging, sub, |
832 | 0 | "Could not prepare the notification message. " |
833 | 0 | "The subscription is late."); |
834 | | /* If the retransmission queue is enabled a retransmission message is allocated */ |
835 | 0 | if(retransmission) |
836 | 0 | UA_free(retransmission); |
837 | 0 | sub->late = true; |
838 | 0 | UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */ |
839 | 0 | return; |
840 | 0 | } |
841 | 0 | } |
842 | | |
843 | | /* <-- The point of no return --> */ |
844 | | |
845 | | /* Set up the response */ |
846 | 0 | response->subscriptionId = sub->subscriptionId; |
847 | 0 | response->moreNotifications = (sub->notificationQueueSize > 0); |
848 | 0 | message->publishTime = el->dateTime_now(el); |
849 | | |
850 | | /* Set sequence number to message. Started at 1 which is given during |
851 | | * creating a new subscription. The 1 is required for initial publish |
852 | | * response with or without an monitored item. */ |
853 | 0 | message->sequenceNumber = sub->nextSequenceNumber; |
854 | |
|
855 | 0 | if(notifications > 0) { |
856 | | /* If the retransmission queue is enabled a retransmission message is |
857 | | * allocated */ |
858 | 0 | if(retransmission) { |
859 | | /* Put the notification message into the retransmission queue. This |
860 | | * needs to be done here, so that the message itself is included in |
861 | | * the available sequence numbers for acknowledgement. */ |
862 | 0 | retransmission->message = response->notificationMessage; |
863 | 0 | UA_Subscription_addRetransmissionMessage(server, sub, retransmission); |
864 | 0 | } |
865 | | /* Only if a notification was created, the sequence number must be |
866 | | * increased. For a keepalive the sequence number can be reused. */ |
867 | 0 | sub->nextSequenceNumber = |
868 | 0 | UA_Subscription_nextSequenceNumber(sub->nextSequenceNumber); |
869 | 0 | } |
870 | | |
871 | | /* Get the available sequence numbers from the retransmission queue */ |
872 | 0 | UA_assert(sub->retransmissionQueueSize <= UA_MAX_RETRANSMISSIONQUEUESIZE); |
873 | 0 | UA_UInt32 seqNumbers[UA_MAX_RETRANSMISSIONQUEUESIZE]; |
874 | 0 | response->availableSequenceNumbers = seqNumbers; |
875 | 0 | response->availableSequenceNumbersSize = sub->retransmissionQueueSize; |
876 | 0 | size_t i = 0; |
877 | 0 | UA_NotificationMessageEntry *nme; |
878 | 0 | TAILQ_FOREACH(nme, &sub->retransmissionQueue, listEntry) { |
879 | 0 | response->availableSequenceNumbers[i] = nme->message.sequenceNumber; |
880 | 0 | ++i; |
881 | 0 | } |
882 | 0 | UA_assert(i == sub->retransmissionQueueSize); |
883 | | |
884 | | /* Send the response */ |
885 | 0 | UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub, |
886 | 0 | "Sending out a publish response with %" PRIu32 |
887 | 0 | " notifications", notifications); |
888 | 0 | sendResponse(server, sub->session->channel, pre->requestId, |
889 | 0 | (UA_Response*)response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]); |
890 | | |
891 | | /* Reset the Subscription state to NORMAL. But only if all notifications |
892 | | * have been sent out. Otherwise keep the Subscription in the LATE state. So |
893 | | * we immediately answer incoming Publish requests. |
894 | | * |
895 | | * (We also check that session->responseQueueSize > 0 in Service_Publish. To |
896 | | * avoid answering Publish requests out of order. As we additionally may have |
897 | | * scheduled a publish callback as a delayed callback. */ |
898 | 0 | if(sub->notificationQueueSize == 0) |
899 | 0 | sub->late = false; |
900 | | |
901 | | /* Reset the KeepAlive after publishing */ |
902 | 0 | sub->currentKeepAliveCount = 0; |
903 | | |
904 | | /* Free the response */ |
905 | 0 | if(retransmission) { |
906 | | /* NotificationMessage was moved into retransmission queue */ |
907 | 0 | UA_NotificationMessage_init(&response->notificationMessage); |
908 | 0 | } |
909 | 0 | response->availableSequenceNumbers = NULL; |
910 | 0 | response->availableSequenceNumbersSize = 0; |
911 | 0 | UA_PublishResponse_clear(&pre->response); |
912 | 0 | UA_free(pre); |
913 | | |
914 | | /* Update the diagnostics statistics */ |
915 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
916 | 0 | sub->publishRequestCount++; |
917 | |
|
918 | 0 | UA_UInt32 sentDCN = (UA_UInt32) |
919 | 0 | (priorDataChangeNotifications - sub->dataChangeNotifications); |
920 | 0 | UA_UInt32 sentEN = (UA_UInt32)(priorEventNotifications - sub->eventNotifications); |
921 | 0 | sub->dataChangeNotificationsCount += sentDCN; |
922 | 0 | sub->eventNotificationsCount += sentEN; |
923 | 0 | sub->notificationsCount += (sentDCN + sentEN); |
924 | 0 | #endif |
925 | | |
926 | | /* Repeat sending notifications if there are more notifications to send. But |
927 | | * only call UA_MonitoredItem_sample in the regular publish callback. */ |
928 | 0 | UA_Boolean done = (sub->notificationQueueSize == 0); |
929 | 0 | if(!done && !sub->delayedCallbackRegistered) { |
930 | 0 | sub->delayedCallbackRegistered = true; |
931 | |
|
932 | 0 | sub->delayedMoreNotifications.callback = (UA_Callback)delayedPublishNotifications; |
933 | 0 | sub->delayedMoreNotifications.application = server; |
934 | 0 | sub->delayedMoreNotifications.context = sub; |
935 | |
|
936 | 0 | el = server->config.eventLoop; |
937 | 0 | el->addDelayedCallback(el, &sub->delayedMoreNotifications); |
938 | 0 | } |
939 | 0 | } |
940 | | |
941 | | void |
942 | 0 | UA_Subscription_resendData(UA_Server *server, UA_Subscription *sub) { |
943 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
944 | 0 | UA_assert(server); |
945 | 0 | UA_assert(sub); |
946 | | |
947 | | /* Part 4, §6.7: If this Method is called, subsequent Publish responses |
948 | | * shall contain the current values of all data MonitoredItems in the |
949 | | * Subscription where the MonitoringMode is set to Reporting. If a value is |
950 | | * queued for a data MonitoredItem, the next value in the queue is sent in |
951 | | * the Publish response. If no value is queued for a data MonitoredItem, the |
952 | | * last value sent is repeated in the Publish response. */ |
953 | 0 | UA_MonitoredItem *mon; |
954 | 0 | LIST_FOREACH(mon, &sub->monitoredItems, listEntry) { |
955 | | /* Create only DataChange notifications */ |
956 | 0 | if(mon->itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) |
957 | 0 | continue; |
958 | | |
959 | | /* Only if the mode is monitoring */ |
960 | 0 | if(mon->monitoringMode != UA_MONITORINGMODE_REPORTING) |
961 | 0 | continue; |
962 | | |
963 | | /* If a value is queued for a data MonitoredItem, the next value in |
964 | | * the queue is sent in the Publish response. */ |
965 | 0 | if(mon->queueSize > 0) |
966 | 0 | continue; |
967 | | |
968 | | /* Create a notification with the last sampled value */ |
969 | 0 | UA_MonitoredItem_createDataChangeNotification(server, mon, &mon->lastValue); |
970 | 0 | } |
971 | 0 | } |
972 | | |
973 | | void |
974 | 0 | UA_Session_ensurePublishQueueSpace(UA_Server* server, UA_Session* session) { |
975 | 0 | if(server->config.maxPublishReqPerSession == 0) |
976 | 0 | return; |
977 | | |
978 | 0 | while(session->responseQueueSize >= server->config.maxPublishReqPerSession) { |
979 | | /* Dequeue a response */ |
980 | 0 | UA_PublishResponseEntry *pre = UA_Session_dequeuePublishReq(session); |
981 | 0 | UA_assert(pre != NULL); /* There must be a pre as session->responseQueueSize > 0 */ |
982 | | |
983 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
984 | 0 | "Sending out a publish response triggered by too many publish requests"); |
985 | | |
986 | | /* Send the response. This response has no related subscription id */ |
987 | 0 | UA_PublishResponse *response = &pre->response; |
988 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS; |
989 | 0 | sendResponse(server, session->channel, pre->requestId, |
990 | 0 | (UA_Response *)response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]); |
991 | | |
992 | | /* Free the response */ |
993 | 0 | UA_PublishResponse_clear(response); |
994 | 0 | UA_free(pre); |
995 | 0 | } |
996 | 0 | } |
997 | | |
998 | | static void |
999 | 0 | sampleAndPublishCallback(UA_Server *server, UA_Subscription *sub) { |
1000 | 0 | UA_assert(sub); |
1001 | | |
1002 | 0 | lockServer(server); |
1003 | |
|
1004 | 0 | UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, sub, |
1005 | 0 | "Sample and Publish Callback"); |
1006 | | |
1007 | | /* Sample the MonitoredItems with sampling interval <0 (which implies |
1008 | | * sampling in the same interval as the subscription) */ |
1009 | 0 | UA_MonitoredItem *mon; |
1010 | 0 | LIST_FOREACH(mon, &sub->samplingMonitoredItems, sampling.subscriptionSampling) { |
1011 | 0 | UA_MonitoredItem_sample(server, mon); |
1012 | 0 | } |
1013 | | |
1014 | | /* Publish the queued notifications */ |
1015 | 0 | UA_Subscription_publish(server, sub); |
1016 | |
|
1017 | 0 | unlockServer(server); |
1018 | 0 | } |
1019 | | |
1020 | | UA_StatusCode |
1021 | | Subscription_setState(UA_Server *server, UA_Subscription *sub, |
1022 | 502 | UA_SubscriptionState state) { |
1023 | 502 | if(state <= UA_SUBSCRIPTIONSTATE_REMOVING) { |
1024 | 502 | if(sub->publishCallbackId != 0) { |
1025 | 0 | removeCallback(server, sub->publishCallbackId); |
1026 | 0 | sub->publishCallbackId = 0; |
1027 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
1028 | 0 | sub->disableCount++; |
1029 | 0 | #endif |
1030 | 0 | } |
1031 | 502 | } else if(sub->publishCallbackId == 0) { |
1032 | 0 | UA_StatusCode res = |
1033 | 0 | addRepeatedCallback(server, (UA_ServerCallback)sampleAndPublishCallback, |
1034 | 0 | sub, sub->publishingInterval, &sub->publishCallbackId); |
1035 | 0 | if(res != UA_STATUSCODE_GOOD) { |
1036 | 0 | sub->state = UA_SUBSCRIPTIONSTATE_STOPPED; |
1037 | 0 | return res; |
1038 | 0 | } |
1039 | | |
1040 | | /* Send (at least a) keepalive after the next publish interval */ |
1041 | 0 | sub->currentKeepAliveCount = sub->maxKeepAliveCount; |
1042 | |
|
1043 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
1044 | 0 | sub->enableCount++; |
1045 | 0 | #endif |
1046 | 0 | } |
1047 | | |
1048 | 502 | sub->state = state; |
1049 | 502 | return UA_STATUSCODE_GOOD; |
1050 | 502 | } |
1051 | | |
1052 | | /****************/ |
1053 | | /* Notification */ |
1054 | | /****************/ |
1055 | | |
1056 | | #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS |
1057 | | |
1058 | | /* The specification states in Part 4 5.12.1.5 that an EventQueueOverflowEvent |
1059 | | * "is generated when the first Event has to be discarded [...] without |
1060 | | * discarding any other event". So only generate one for all deleted events. */ |
1061 | | static UA_StatusCode |
1062 | | createEventOverflowNotification(UA_Server *server, UA_Subscription *sub, |
1063 | 0 | UA_MonitoredItem *mon) { |
1064 | | /* Avoid creating two adjacent overflow events */ |
1065 | 0 | UA_Notification *indicator = NULL; |
1066 | 0 | if(mon->parameters.discardOldest) { |
1067 | 0 | indicator = TAILQ_FIRST(&mon->queue); |
1068 | 0 | UA_assert(indicator); /* must exist */ |
1069 | 0 | if(indicator->isOverflowEvent) |
1070 | 0 | return UA_STATUSCODE_GOOD; |
1071 | 0 | } else { |
1072 | 0 | indicator = TAILQ_LAST(&mon->queue, NotificationQueue); |
1073 | 0 | UA_assert(indicator); /* must exist */ |
1074 | | /* Skip the last element. It is the recently added notification that |
1075 | | * shall be kept. We know it is not an OverflowEvent. */ |
1076 | 0 | UA_Notification *before = TAILQ_PREV(indicator, NotificationQueue, monEntry); |
1077 | 0 | if(before && before->isOverflowEvent) |
1078 | 0 | return UA_STATUSCODE_GOOD; |
1079 | 0 | } |
1080 | | |
1081 | | /* A Notification is inserted into the queue which includes only the |
1082 | | * NodeId of the OverflowEventType. */ |
1083 | | |
1084 | | /* Get the EventFilter */ |
1085 | 0 | if(mon->parameters.filter.content.decoded.type != &UA_TYPES[UA_TYPES_EVENTFILTER]) |
1086 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
1087 | 0 | const UA_EventFilter *ef = (const UA_EventFilter*) |
1088 | 0 | mon->parameters.filter.content.decoded.data; |
1089 | 0 | if(ef->selectClausesSize == 0) |
1090 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
1091 | | |
1092 | | /* Initialize the notification */ |
1093 | 0 | UA_Notification *n = UA_Notification_new(); |
1094 | 0 | if(!n) |
1095 | 0 | return UA_STATUSCODE_BADOUTOFMEMORY; |
1096 | 0 | n->isOverflowEvent = true; |
1097 | 0 | n->mon = mon; |
1098 | 0 | n->data.event.clientHandle = mon->parameters.clientHandle; |
1099 | | |
1100 | | /* The session is needed to evaluate the select-clause. But used only for |
1101 | | * limited reads on the source node. So we can use the admin-session here if |
1102 | | * the subscription is detached. */ |
1103 | 0 | UA_Session *session = (sub->session) ? sub->session : &server->adminSession; |
1104 | | |
1105 | | /* Set up the context for the filter evaluation */ |
1106 | 0 | static UA_String sourceName = UA_STRING_STATIC("Internal/EventQueueOverflow"); |
1107 | 0 | UA_KeyValuePair fields[1]; |
1108 | 0 | fields[0].key = (UA_QualifiedName){1, UA_STRING_STATIC("/SourceName")}; |
1109 | 0 | UA_Variant_setScalar(&fields[0].value, &sourceName, &UA_TYPES[UA_TYPES_STRING]); |
1110 | 0 | UA_KeyValueMap fieldMap = {1, fields}; |
1111 | |
|
1112 | 0 | UA_FilterEvalContext ctx; |
1113 | 0 | UA_FilterEvalContext_init(&ctx); |
1114 | 0 | ctx.server = server; |
1115 | 0 | ctx.session = session; |
1116 | 0 | ctx.filter = *ef; |
1117 | 0 | ctx.ed.sourceNode = UA_NS0ID(SERVER); |
1118 | 0 | ctx.ed.eventType = UA_NS0ID(EVENTQUEUEOVERFLOWEVENTTYPE); |
1119 | 0 | ctx.ed.severity = 201; /* TODO: Can this be configured? */ |
1120 | 0 | ctx.ed.message = UA_LOCALIZEDTEXT(NULL, NULL); |
1121 | 0 | ctx.ed.eventFields = &fieldMap; |
1122 | | |
1123 | | /* Evaluate the select clause to populate the notification */ |
1124 | 0 | UA_StatusCode res = evaluateSelectClause(&ctx, &n->data.event); |
1125 | 0 | UA_FilterEvalContext_reset(&ctx); |
1126 | 0 | if(res != UA_STATUSCODE_GOOD) { |
1127 | 0 | UA_free(n); |
1128 | 0 | return res; |
1129 | 0 | } |
1130 | | |
1131 | | /* Insert before the removed notification. This is either first in the |
1132 | | * queue (if the oldest notification was removed) or before the new event |
1133 | | * that remains the last element of the queue. |
1134 | | * |
1135 | | * Ensure that the following is consistent with UA_Notification_enqueueMon |
1136 | | * and UA_Notification_enqueueSub! */ |
1137 | 0 | TAILQ_INSERT_BEFORE(indicator, n, monEntry); |
1138 | 0 | ++mon->eventOverflows; |
1139 | 0 | ++mon->queueSize; |
1140 | | |
1141 | | /* Test for consistency */ |
1142 | 0 | UA_assert(mon->queueSize >= mon->eventOverflows); |
1143 | 0 | UA_assert(mon->eventOverflows <= mon->queueSize - mon->eventOverflows + 1); |
1144 | | |
1145 | 0 | if(TAILQ_NEXT(indicator, subEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) { |
1146 | | /* Insert just before the indicator */ |
1147 | 0 | TAILQ_INSERT_BEFORE(indicator, n, subEntry); |
1148 | 0 | } else { |
1149 | | /* The indicator was not reporting or not added yet. */ |
1150 | 0 | if(!mon->parameters.discardOldest) { |
1151 | | /* Add last to the per-Subscription queue */ |
1152 | 0 | TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue, |
1153 | 0 | n, subEntry); |
1154 | 0 | } else { |
1155 | | /* Find the oldest reported element. Add before that. */ |
1156 | 0 | while(indicator) { |
1157 | 0 | indicator = TAILQ_PREV(indicator, NotificationQueue, monEntry); |
1158 | 0 | if(!indicator) { |
1159 | 0 | TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue, |
1160 | 0 | n, subEntry); |
1161 | 0 | break; |
1162 | 0 | } |
1163 | 0 | if(TAILQ_NEXT(indicator, subEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) { |
1164 | 0 | TAILQ_INSERT_BEFORE(indicator, n, subEntry); |
1165 | 0 | break; |
1166 | 0 | } |
1167 | 0 | } |
1168 | 0 | } |
1169 | 0 | } |
1170 | |
|
1171 | 0 | ++sub->notificationQueueSize; |
1172 | 0 | ++sub->eventNotifications; |
1173 | | |
1174 | | /* Update the diagnostics statistics */ |
1175 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
1176 | 0 | sub->eventQueueOverFlowCount++; |
1177 | 0 | #endif |
1178 | |
|
1179 | 0 | return UA_STATUSCODE_GOOD; |
1180 | 0 | } |
1181 | | |
1182 | | #endif |
1183 | | |
1184 | | /* Set the InfoBits that a datachange notification was removed */ |
1185 | | static void |
1186 | 0 | setOverflowInfoBits(UA_MonitoredItem *mon) { |
1187 | | /* Only for queues with more than one element */ |
1188 | 0 | if(mon->parameters.queueSize == 1) |
1189 | 0 | return; |
1190 | | |
1191 | 0 | UA_Notification *indicator = NULL; |
1192 | 0 | if(mon->parameters.discardOldest) { |
1193 | 0 | indicator = TAILQ_FIRST(&mon->queue); |
1194 | 0 | } else { |
1195 | 0 | indicator = TAILQ_LAST(&mon->queue, NotificationQueue); |
1196 | 0 | } |
1197 | 0 | UA_assert(indicator); /* must exist */ |
1198 | | |
1199 | 0 | indicator->data.dataChange.value.hasStatus = true; |
1200 | 0 | indicator->data.dataChange.value.status |= |
1201 | 0 | (UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW); |
1202 | 0 | } |
1203 | | |
1204 | | /* Remove the InfoBits when the queueSize was reduced to 1 */ |
1205 | | void |
1206 | 0 | UA_MonitoredItem_removeOverflowInfoBits(UA_MonitoredItem *mon) { |
1207 | | /* Don't consider queue size > 1 and Event MonitoredItems */ |
1208 | 0 | if(mon->parameters.queueSize > 1 || |
1209 | 0 | mon->itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) |
1210 | 0 | return; |
1211 | | |
1212 | | /* Get the first notification */ |
1213 | 0 | UA_Notification *n = TAILQ_FIRST(&mon->queue); |
1214 | 0 | if(!n) |
1215 | 0 | return; |
1216 | | |
1217 | | /* Assertion that at most one notification is in the queue */ |
1218 | 0 | UA_assert(n == TAILQ_LAST(&mon->queue, NotificationQueue)); |
1219 | | |
1220 | | /* Remve the Infobits */ |
1221 | 0 | n->data.dataChange.value.status &= ~(UA_StatusCode) |
1222 | 0 | (UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW); |
1223 | 0 | } |
1224 | | |
1225 | | /*****************/ |
1226 | | /* MonitoredItem */ |
1227 | | /*****************/ |
1228 | | |
1229 | | void |
1230 | 0 | UA_MonitoredItem_init(UA_MonitoredItem *mon) { |
1231 | 0 | memset(mon, 0, sizeof(UA_MonitoredItem)); |
1232 | 0 | TAILQ_INIT(&mon->queue); |
1233 | 0 | mon->triggeredUntil = UA_INT64_MIN; |
1234 | 0 | } |
1235 | | |
1236 | | static UA_StatusCode |
1237 | | addMonitoredItemBackpointer(UA_Server *server, UA_Session *session, |
1238 | 0 | UA_Node *node, void *data) { |
1239 | 0 | UA_MonitoredItem *mon = (UA_MonitoredItem*)data; |
1240 | 0 | UA_assert(mon != (UA_MonitoredItem*)~0); |
1241 | 0 | mon->sampling.nodeListNext = node->head.monitoredItems; |
1242 | 0 | node->head.monitoredItems = mon; |
1243 | 0 | return UA_STATUSCODE_GOOD; |
1244 | 0 | } |
1245 | | |
1246 | | static UA_StatusCode |
1247 | | removeMonitoredItemBackPointer(UA_Server *server, UA_Session *session, |
1248 | 0 | UA_Node *node, void *data) { |
1249 | 0 | if(!node->head.monitoredItems) |
1250 | 0 | return UA_STATUSCODE_GOOD; |
1251 | | |
1252 | | /* Edge case that it's the first element */ |
1253 | 0 | UA_MonitoredItem *remove = (UA_MonitoredItem*)data; |
1254 | 0 | if(node->head.monitoredItems == remove) { |
1255 | 0 | node->head.monitoredItems = remove->sampling.nodeListNext; |
1256 | 0 | return UA_STATUSCODE_GOOD; |
1257 | 0 | } |
1258 | | |
1259 | 0 | UA_MonitoredItem *prev = node->head.monitoredItems; |
1260 | 0 | UA_MonitoredItem *entry = prev->sampling.nodeListNext; |
1261 | 0 | for(; entry != NULL; prev = entry, entry = entry->sampling.nodeListNext) { |
1262 | 0 | if(entry == remove) { |
1263 | 0 | prev->sampling.nodeListNext = entry->sampling.nodeListNext; |
1264 | 0 | break; |
1265 | 0 | } |
1266 | 0 | } |
1267 | |
|
1268 | 0 | return UA_STATUSCODE_GOOD; |
1269 | 0 | } |
1270 | | |
1271 | | void |
1272 | 0 | UA_MonitoredItem_register(UA_Server *server, UA_MonitoredItem *mon) { |
1273 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
1274 | |
|
1275 | 0 | if(mon->registered) |
1276 | 0 | return; |
1277 | 0 | mon->registered = true; |
1278 | | |
1279 | | /* Register in Subscription and Server */ |
1280 | 0 | UA_Subscription *sub = mon->subscription; |
1281 | 0 | mon->monitoredItemId = ++sub->lastMonitoredItemId; |
1282 | 0 | mon->subscription = sub; |
1283 | 0 | LIST_INSERT_HEAD(&sub->monitoredItems, mon, listEntry); |
1284 | 0 | sub->monitoredItemsSize++; |
1285 | 0 | server->monitoredItemsSize++; |
1286 | | |
1287 | | /* Register the MonitoredItem in userland */ |
1288 | 0 | if(server->config.monitoredItemRegisterCallback) { |
1289 | 0 | UA_Session *session = sub->session; |
1290 | 0 | void *targetContext = NULL; |
1291 | 0 | getNodeContext(server, mon->itemToMonitor.nodeId, &targetContext); |
1292 | 0 | server->config.monitoredItemRegisterCallback(server, |
1293 | 0 | session ? &session->sessionId : NULL, |
1294 | 0 | session ? session->context : NULL, |
1295 | 0 | &mon->itemToMonitor.nodeId, |
1296 | 0 | targetContext, |
1297 | 0 | mon->itemToMonitor.attributeId, false); |
1298 | 0 | } |
1299 | 0 | } |
1300 | | |
1301 | | static void |
1302 | 0 | UA_Server_unregisterMonitoredItem(UA_Server *server, UA_MonitoredItem *mon) { |
1303 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
1304 | |
|
1305 | 0 | if(!mon->registered) |
1306 | 0 | return; |
1307 | 0 | mon->registered = false; |
1308 | |
|
1309 | 0 | UA_Subscription *sub = mon->subscription; |
1310 | 0 | UA_LOG_INFO_SUBSCRIPTION(server->config.logging, sub, |
1311 | 0 | "MonitoredItem %" PRIi32 " | Deleting the MonitoredItem", |
1312 | 0 | mon->monitoredItemId); |
1313 | | |
1314 | | /* Deregister MonitoredItem in userland */ |
1315 | 0 | if(server->config.monitoredItemRegisterCallback) { |
1316 | 0 | UA_Session *session = sub->session; |
1317 | 0 | void *targetContext = NULL; |
1318 | 0 | getNodeContext(server, mon->itemToMonitor.nodeId, &targetContext); |
1319 | 0 | server->config.monitoredItemRegisterCallback(server, |
1320 | 0 | session ? &session->sessionId : NULL, |
1321 | 0 | session ? session->context : NULL, |
1322 | 0 | &mon->itemToMonitor.nodeId, |
1323 | 0 | targetContext, |
1324 | 0 | mon->itemToMonitor.attributeId, true); |
1325 | 0 | } |
1326 | | |
1327 | | /* Deregister in Subscription and server */ |
1328 | 0 | sub->monitoredItemsSize--; |
1329 | 0 | LIST_REMOVE(mon, listEntry); |
1330 | 0 | server->monitoredItemsSize--; |
1331 | 0 | } |
1332 | | |
1333 | | UA_StatusCode |
1334 | | UA_MonitoredItem_setMonitoringMode(UA_Server *server, UA_MonitoredItem *mon, |
1335 | 0 | UA_MonitoringMode monitoringMode) { |
1336 | | /* Check if the MonitoringMode is valid or not */ |
1337 | 0 | if(monitoringMode > UA_MONITORINGMODE_REPORTING) |
1338 | 0 | return UA_STATUSCODE_BADMONITORINGMODEINVALID; |
1339 | | |
1340 | | /* Set the MonitoringMode, store the old mode */ |
1341 | 0 | UA_MonitoringMode oldMode = mon->monitoringMode; |
1342 | 0 | mon->monitoringMode = monitoringMode; |
1343 | |
|
1344 | 0 | UA_Notification *notification; |
1345 | | /* Reporting is disabled. This causes all Notifications to be dequeued and |
1346 | | * deleted. Also remove the last samples so that we immediately generate a |
1347 | | * Notification when re-activated. */ |
1348 | 0 | if(mon->monitoringMode == UA_MONITORINGMODE_DISABLED) { |
1349 | 0 | UA_Notification *notification_tmp; |
1350 | 0 | UA_MonitoredItem_unregisterSampling(server, mon); |
1351 | 0 | TAILQ_FOREACH_SAFE(notification, &mon->queue, monEntry, notification_tmp) { |
1352 | 0 | UA_Notification_delete(notification); |
1353 | 0 | } |
1354 | 0 | UA_DataValue_clear(&mon->lastValue); |
1355 | 0 | return UA_STATUSCODE_GOOD; |
1356 | 0 | } |
1357 | | |
1358 | | /* When reporting is enabled, put all notifications that were already |
1359 | | * sampled into the global queue of the subscription. When sampling is |
1360 | | * enabled, remove all notifications from the global queue. !!! This needs |
1361 | | * to be the same operation as in UA_Notification_enqueue !!! */ |
1362 | 0 | if(mon->monitoringMode == UA_MONITORINGMODE_REPORTING) { |
1363 | | /* Make all notifications reporting. Re-enqueue to ensure they have the |
1364 | | * right order if some notifications are already reported by a trigger |
1365 | | * link. */ |
1366 | 0 | TAILQ_FOREACH(notification, &mon->queue, monEntry) { |
1367 | 0 | UA_Notification_dequeueSub(notification); |
1368 | 0 | UA_Notification_enqueueSub(notification); |
1369 | 0 | } |
1370 | 0 | } else /* mon->monitoringMode == UA_MONITORINGMODE_SAMPLING */ { |
1371 | | /* Make all notifications non-reporting */ |
1372 | 0 | TAILQ_FOREACH(notification, &mon->queue, monEntry) |
1373 | 0 | UA_Notification_dequeueSub(notification); |
1374 | 0 | } |
1375 | | |
1376 | | /* Register the sampling callback with an interval. If registering the |
1377 | | * sampling callback failed, set to disabled. But don't delete the current |
1378 | | * notifications. */ |
1379 | 0 | UA_StatusCode res = UA_MonitoredItem_registerSampling(server, mon); |
1380 | 0 | if(res != UA_STATUSCODE_GOOD) { |
1381 | 0 | mon->monitoringMode = UA_MONITORINGMODE_DISABLED; |
1382 | 0 | return res; |
1383 | 0 | } |
1384 | | |
1385 | | /* Manually create the first sample if the MonitoredItem was disabled, the |
1386 | | * MonitoredItem is now sampling (or reporting) and it is not an |
1387 | | * Event-MonitoredItem */ |
1388 | 0 | if(oldMode == UA_MONITORINGMODE_DISABLED && |
1389 | 0 | mon->monitoringMode > UA_MONITORINGMODE_DISABLED && |
1390 | 0 | mon->itemToMonitor.attributeId != UA_ATTRIBUTEID_EVENTNOTIFIER) |
1391 | 0 | UA_MonitoredItem_sample(server, mon); |
1392 | |
|
1393 | 0 | return UA_STATUSCODE_GOOD; |
1394 | 0 | } |
1395 | | |
1396 | | static void |
1397 | 0 | delayedFreeMonitoredItem(void *app, void *context) { |
1398 | 0 | UA_free(context); |
1399 | 0 | } |
1400 | | |
1401 | | void |
1402 | 0 | UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *mon) { |
1403 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
1404 | | |
1405 | | /* Remove the sampling callback */ |
1406 | 0 | UA_MonitoredItem_unregisterSampling(server, mon); |
1407 | | |
1408 | | /* Deregister in Server and Subscription */ |
1409 | 0 | if(mon->registered) |
1410 | 0 | UA_Server_unregisterMonitoredItem(server, mon); |
1411 | | |
1412 | | /* Cancel outstanding async reads. The status code avoids the sample being |
1413 | | * processed. Call _processReady to ensure that the callbacks have been |
1414 | | * triggered. */ |
1415 | 0 | if(mon->outstandingAsyncReads > 0) |
1416 | 0 | async_cancel(server, mon, UA_STATUSCODE_BADREQUESTCANCELLEDBYREQUEST, true); |
1417 | 0 | UA_assert(mon->outstandingAsyncReads == 0); |
1418 | | |
1419 | | /* Remove the TriggeringLinks */ |
1420 | 0 | if(mon->triggeringLinksSize > 0) { |
1421 | 0 | UA_free(mon->triggeringLinks); |
1422 | 0 | mon->triggeringLinks = NULL; |
1423 | 0 | mon->triggeringLinksSize = 0; |
1424 | 0 | } |
1425 | | |
1426 | | /* Remove the queued notifications attached to the subscription */ |
1427 | 0 | UA_Notification *notification, *notification_tmp; |
1428 | 0 | TAILQ_FOREACH_SAFE(notification, &mon->queue, monEntry, notification_tmp) { |
1429 | 0 | UA_Notification_delete(notification); |
1430 | 0 | } |
1431 | | |
1432 | | /* Remove the settings */ |
1433 | 0 | UA_ReadValueId_clear(&mon->itemToMonitor); |
1434 | 0 | UA_MonitoringParameters_clear(&mon->parameters); |
1435 | | |
1436 | | /* Remove the last samples */ |
1437 | 0 | UA_DataValue_clear(&mon->lastValue); |
1438 | | |
1439 | | /* If this is a local MonitoredItem, clean up additional values */ |
1440 | 0 | if(mon->subscription == server->adminSubscription) { |
1441 | 0 | UA_LocalMonitoredItem *lm = (UA_LocalMonitoredItem*)mon; |
1442 | 0 | for(size_t i = 0; i < lm->eventFields.mapSize; i++) |
1443 | 0 | UA_Variant_init(&lm->eventFields.map[i].value); |
1444 | 0 | UA_KeyValueMap_clear(&lm->eventFields); |
1445 | 0 | } |
1446 | | |
1447 | | /* Add a delayed callback to remove the MonitoredItem when the current jobs |
1448 | | * have completed. This is needed to allow that a local MonitoredItem can |
1449 | | * remove itself in the callback. */ |
1450 | 0 | mon->delayedFreePointers.callback = delayedFreeMonitoredItem; |
1451 | 0 | mon->delayedFreePointers.application = NULL; |
1452 | 0 | mon->delayedFreePointers.context = mon; |
1453 | 0 | UA_EventLoop *el = server->config.eventLoop; |
1454 | 0 | el->addDelayedCallback(el, &mon->delayedFreePointers); |
1455 | 0 | } |
1456 | | |
1457 | | void |
1458 | 0 | UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) { |
1459 | | /* There can be only one EventOverflow more than normal entries. Because |
1460 | | * EventOverflows are never adjacent. */ |
1461 | 0 | UA_assert(mon->queueSize >= mon->eventOverflows); |
1462 | 0 | UA_assert(mon->eventOverflows <= mon->queueSize - mon->eventOverflows + 1); |
1463 | | |
1464 | | /* MonitoredItems are always attached to a Subscription */ |
1465 | 0 | UA_Subscription *sub = mon->subscription; |
1466 | 0 | UA_assert(sub); |
1467 | | |
1468 | | /* Nothing to do */ |
1469 | 0 | if(mon->queueSize - mon->eventOverflows <= mon->parameters.queueSize) |
1470 | 0 | return; |
1471 | | |
1472 | | /* Help clang-analyzer */ |
1473 | 0 | #if defined(UA_DEBUG) && defined(UA_ENABLE_SUBSCRIPTIONS_EVENTS) |
1474 | 0 | UA_Notification *last_del = NULL; |
1475 | 0 | (void)last_del; |
1476 | 0 | #endif |
1477 | | |
1478 | | /* Remove notifications until the required queue size is reached */ |
1479 | 0 | UA_Boolean reporting = false; |
1480 | 0 | size_t remove = mon->queueSize - mon->eventOverflows - mon->parameters.queueSize; |
1481 | 0 | while(remove > 0) { |
1482 | | /* The minimum queue size (without EventOverflows) is 1. At least two |
1483 | | * notifications that are not EventOverflows are in the queue. */ |
1484 | 0 | UA_assert(mon->queueSize - mon->eventOverflows >= 2); |
1485 | | |
1486 | | /* Select the next notification to delete. Skip over overflow events. */ |
1487 | 0 | UA_Notification *del = NULL; |
1488 | 0 | if(mon->parameters.discardOldest) { |
1489 | | /* Remove the oldest */ |
1490 | 0 | del = TAILQ_FIRST(&mon->queue); |
1491 | 0 | #if defined(UA_ENABLE_SUBSCRIPTIONS_EVENTS) |
1492 | | /* Skip overflow events */ |
1493 | 0 | while(del->isOverflowEvent) { |
1494 | 0 | del = TAILQ_NEXT(del, monEntry); |
1495 | 0 | UA_assert(del != last_del); |
1496 | 0 | } |
1497 | 0 | #endif |
1498 | 0 | } else { |
1499 | | /* Remove the second newest (to keep the up-to-date notification). |
1500 | | * The last entry is not an OverflowEvent -- we just added it. */ |
1501 | 0 | del = TAILQ_LAST(&mon->queue, NotificationQueue); |
1502 | 0 | del = TAILQ_PREV(del, NotificationQueue, monEntry); |
1503 | 0 | #if defined(UA_ENABLE_SUBSCRIPTIONS_EVENTS) |
1504 | | /* skip overflow events */ |
1505 | 0 | while(del->isOverflowEvent) { |
1506 | 0 | del = TAILQ_PREV(del, NotificationQueue, monEntry); |
1507 | 0 | UA_assert(del != last_del); |
1508 | 0 | } |
1509 | 0 | #endif |
1510 | 0 | } |
1511 | | |
1512 | 0 | UA_assert(del); /* There must be one entry that can be deleted */ |
1513 | | |
1514 | | /* Only create OverflowEvents (and set InfoBits) if the notification |
1515 | | * that is removed is reported */ |
1516 | 0 | if(TAILQ_NEXT(del, subEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) |
1517 | 0 | reporting = true; |
1518 | | |
1519 | | /* Move the entry after del in the per-MonitoredItem queue right after |
1520 | | * del in the per-Subscription queue. So we don't starve MonitoredItems |
1521 | | * with a high sampling interval in the Subscription queue by always |
1522 | | * removing their first appearance in the per-Subscription queue. |
1523 | | * |
1524 | | * With MonitoringMode == SAMPLING, the Notifications are not (all) in |
1525 | | * the per-Subscription queue. Don't reinsert in that case. |
1526 | | * |
1527 | | * For the reinsertion to work, first insert into the per-Subscription |
1528 | | * queue. */ |
1529 | 0 | if(TAILQ_NEXT(del, subEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) { |
1530 | 0 | UA_Notification *after_del = TAILQ_NEXT(del, monEntry); |
1531 | 0 | UA_assert(after_del); /* There must be one remaining element after del */ |
1532 | 0 | if(TAILQ_NEXT(after_del, subEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) { |
1533 | 0 | TAILQ_REMOVE(&sub->notificationQueue, after_del, subEntry); |
1534 | 0 | TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, subEntry); |
1535 | 0 | } |
1536 | 0 | } |
1537 | | |
1538 | 0 | remove--; |
1539 | | |
1540 | | /* Delete the notification and remove it from the queues */ |
1541 | 0 | UA_Notification_delete(del); |
1542 | | |
1543 | | /* Update the subscription diagnostics statistics */ |
1544 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
1545 | 0 | sub->monitoringQueueOverflowCount++; |
1546 | 0 | #endif |
1547 | | |
1548 | | /* Help scan-analyzer */ |
1549 | 0 | #if defined(UA_DEBUG) && defined(UA_ENABLE_SUBSCRIPTIONS_EVENTS) |
1550 | 0 | last_del = del; |
1551 | 0 | #endif |
1552 | 0 | UA_assert(del != TAILQ_FIRST(&mon->queue)); |
1553 | 0 | UA_assert(del != TAILQ_LAST(&mon->queue, NotificationQueue)); |
1554 | 0 | UA_assert(del != TAILQ_PREV(TAILQ_LAST(&mon->queue, NotificationQueue), |
1555 | 0 | NotificationQueue, monEntry)); |
1556 | 0 | } |
1557 | | |
1558 | | /* Leave an entry to indicate that notifications were removed */ |
1559 | 0 | if(reporting) { |
1560 | 0 | #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS |
1561 | 0 | if(mon->itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) |
1562 | 0 | createEventOverflowNotification(server, sub, mon); |
1563 | 0 | else |
1564 | 0 | #endif |
1565 | 0 | setOverflowInfoBits(mon); |
1566 | 0 | } |
1567 | 0 | } |
1568 | | |
1569 | | static void |
1570 | 0 | UA_MonitoredItem_lockAndSample(UA_Server *server, UA_MonitoredItem *mon) { |
1571 | 0 | lockServer(server); |
1572 | 0 | UA_MonitoredItem_sample(server, mon); |
1573 | 0 | unlockServer(server); |
1574 | 0 | } |
1575 | | |
1576 | | UA_StatusCode |
1577 | 0 | UA_MonitoredItem_registerSampling(UA_Server *server, UA_MonitoredItem *mon) { |
1578 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
1579 | | |
1580 | | /* Sampling is already registered */ |
1581 | 0 | if(mon->samplingType != UA_MONITOREDITEMSAMPLINGTYPE_NONE) |
1582 | 0 | return UA_STATUSCODE_GOOD; |
1583 | | |
1584 | | /* The subscription is attached to a session at this point */ |
1585 | 0 | UA_Subscription *sub = mon->subscription; |
1586 | 0 | if(!sub->session) |
1587 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
1588 | | |
1589 | 0 | UA_StatusCode res = UA_STATUSCODE_GOOD; |
1590 | 0 | if(mon->itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER || |
1591 | 0 | mon->parameters.samplingInterval == 0.0) { |
1592 | | /* Add to the linked list in the node */ |
1593 | 0 | res = editNode(server, sub->session, &mon->itemToMonitor.nodeId, 0, |
1594 | 0 | UA_REFERENCETYPESET_NONE, UA_BROWSEDIRECTION_INVALID, |
1595 | 0 | addMonitoredItemBackpointer, mon); |
1596 | 0 | if(res == UA_STATUSCODE_GOOD) |
1597 | 0 | mon->samplingType = UA_MONITOREDITEMSAMPLINGTYPE_EVENT; |
1598 | 0 | } else if(mon->parameters.samplingInterval == sub->publishingInterval) { |
1599 | | /* Add to the subscription for sampling before every publish */ |
1600 | 0 | LIST_INSERT_HEAD(&sub->samplingMonitoredItems, mon, |
1601 | 0 | sampling.subscriptionSampling); |
1602 | 0 | mon->samplingType = UA_MONITOREDITEMSAMPLINGTYPE_PUBLISH; |
1603 | 0 | } else { |
1604 | | /* DataChange MonitoredItems with a positive sampling interval have a |
1605 | | * repeated callback. Other MonitoredItems are attached to the Node in a |
1606 | | * linked list of backpointers. */ |
1607 | 0 | res = addRepeatedCallback(server, |
1608 | 0 | (UA_ServerCallback)UA_MonitoredItem_lockAndSample, |
1609 | 0 | mon, mon->parameters.samplingInterval, |
1610 | 0 | &mon->sampling.callbackId); |
1611 | 0 | if(res == UA_STATUSCODE_GOOD) |
1612 | 0 | mon->samplingType = UA_MONITOREDITEMSAMPLINGTYPE_CYCLIC; |
1613 | 0 | } |
1614 | |
|
1615 | 0 | return res; |
1616 | 0 | } |
1617 | | |
1618 | | void |
1619 | 0 | UA_MonitoredItem_unregisterSampling(UA_Server *server, UA_MonitoredItem *mon) { |
1620 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
1621 | |
|
1622 | 0 | switch(mon->samplingType) { |
1623 | 0 | case UA_MONITOREDITEMSAMPLINGTYPE_CYCLIC: |
1624 | | /* Remove repeated callback */ |
1625 | 0 | removeCallback(server, mon->sampling.callbackId); |
1626 | 0 | break; |
1627 | | |
1628 | 0 | case UA_MONITOREDITEMSAMPLINGTYPE_EVENT: { |
1629 | | /* Removing is always done with the AdminSession. So it also works when |
1630 | | * the Subscription has been detached from its Session. */ |
1631 | 0 | editNode(server, &server->adminSession, &mon->itemToMonitor.nodeId, 0, |
1632 | 0 | UA_REFERENCETYPESET_NONE, UA_BROWSEDIRECTION_INVALID, |
1633 | 0 | removeMonitoredItemBackPointer, mon); |
1634 | 0 | break; |
1635 | 0 | } |
1636 | | |
1637 | 0 | case UA_MONITOREDITEMSAMPLINGTYPE_PUBLISH: |
1638 | | /* Added to the subscription */ |
1639 | 0 | LIST_REMOVE(mon, sampling.subscriptionSampling); |
1640 | 0 | break; |
1641 | | |
1642 | 0 | case UA_MONITOREDITEMSAMPLINGTYPE_NONE: |
1643 | 0 | default: |
1644 | | /* Sampling is not registered */ |
1645 | 0 | break; |
1646 | 0 | } |
1647 | | |
1648 | 0 | mon->samplingType = UA_MONITOREDITEMSAMPLINGTYPE_NONE; |
1649 | 0 | } |
1650 | | |
1651 | | UA_StatusCode |
1652 | 0 | UA_MonitoredItem_removeLink(UA_Subscription *sub, UA_MonitoredItem *mon, UA_UInt32 linkId) { |
1653 | | /* Find the index */ |
1654 | 0 | size_t i = 0; |
1655 | 0 | for(; i < mon->triggeringLinksSize; i++) { |
1656 | 0 | if(mon->triggeringLinks[i] == linkId) |
1657 | 0 | break; |
1658 | 0 | } |
1659 | | |
1660 | | /* Not existing / already removed */ |
1661 | 0 | if(i == mon->triggeringLinksSize) |
1662 | 0 | return UA_STATUSCODE_BADMONITOREDITEMIDINVALID; |
1663 | | |
1664 | | /* Remove the link */ |
1665 | 0 | mon->triggeringLinksSize--; |
1666 | 0 | if(mon->triggeringLinksSize == 0) { |
1667 | 0 | UA_free(mon->triggeringLinks); |
1668 | 0 | mon->triggeringLinks = NULL; |
1669 | 0 | } else { |
1670 | 0 | mon->triggeringLinks[i] = mon->triggeringLinks[mon->triggeringLinksSize]; |
1671 | 0 | UA_UInt32 *tmpLinks = (UA_UInt32*) |
1672 | 0 | UA_realloc(mon->triggeringLinks, mon->triggeringLinksSize * sizeof(UA_UInt32)); |
1673 | 0 | if(tmpLinks) |
1674 | 0 | mon->triggeringLinks = tmpLinks; |
1675 | 0 | } |
1676 | | |
1677 | | /* Does the target MonitoredItem exist? This is stupid, but the CTT wants us |
1678 | | * to to this. We don't auto-remove links together with the target |
1679 | | * MonitoredItem. Links to removed MonitoredItems are removed when the link |
1680 | | * triggers and the target no longer exists. */ |
1681 | 0 | UA_MonitoredItem *mon2 = UA_Subscription_getMonitoredItem(sub, linkId); |
1682 | 0 | if(!mon2) |
1683 | 0 | return UA_STATUSCODE_BADMONITOREDITEMIDINVALID; |
1684 | | |
1685 | 0 | return UA_STATUSCODE_GOOD; |
1686 | 0 | } |
1687 | | |
1688 | | UA_StatusCode |
1689 | 0 | UA_MonitoredItem_addLink(UA_Subscription *sub, UA_MonitoredItem *mon, UA_UInt32 linkId) { |
1690 | | /* Does the target MonitoredItem exist? */ |
1691 | 0 | UA_MonitoredItem *mon2 = UA_Subscription_getMonitoredItem(sub, linkId); |
1692 | 0 | if(!mon2) |
1693 | 0 | return UA_STATUSCODE_BADMONITOREDITEMIDINVALID; |
1694 | | |
1695 | | /* Does the link already exist? */ |
1696 | 0 | for(size_t i = 0 ; i < mon->triggeringLinksSize; i++) { |
1697 | 0 | if(mon->triggeringLinks[i] == linkId) |
1698 | 0 | return UA_STATUSCODE_GOOD; |
1699 | 0 | } |
1700 | | |
1701 | | /* Allocate the memory */ |
1702 | 0 | UA_UInt32 *tmpLinkIds = (UA_UInt32*) |
1703 | 0 | UA_realloc(mon->triggeringLinks, (mon->triggeringLinksSize + 1) * sizeof(UA_UInt32)); |
1704 | 0 | if(!tmpLinkIds) |
1705 | 0 | return UA_STATUSCODE_BADOUTOFMEMORY; |
1706 | 0 | mon->triggeringLinks = tmpLinkIds; |
1707 | | |
1708 | | /* Add the link */ |
1709 | 0 | mon->triggeringLinks[mon->triggeringLinksSize] = linkId; |
1710 | 0 | mon->triggeringLinksSize++; |
1711 | 0 | return UA_STATUSCODE_GOOD; |
1712 | 0 | } |
1713 | | |
1714 | | #endif /* UA_ENABLE_SUBSCRIPTIONS */ |