/src/open62541/src/server/ua_services_subscription.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* This Source Code Form is subject to the terms of the Mozilla Public |
2 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
3 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
4 | | * |
5 | | * Copyright 2014-2018, 2022 (c) Fraunhofer IOSB (Author: Julius Pfrommer) |
6 | | * Copyright 2016-2017 (c) Florian Palm |
7 | | * Copyright 2015 (c) Chris Iatrou |
8 | | * Copyright 2015-2016 (c) Sten GrĂ¼ner |
9 | | * Copyright 2015-2016 (c) Oleksiy Vasylyev |
10 | | * Copyright 2017 (c) Stefan Profanter, fortiss GmbH |
11 | | * Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH |
12 | | * Copyright 2017 (c) Mattias Bornhager |
13 | | * Copyright 2017 (c) Henrik Norrman |
14 | | * Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA |
15 | | * Copyright 2018 (c) Fabian Arndt, Root-Core |
16 | | * Copyright 2017-2019 (c) HMS Industrial Networks AB (Author: Jonas Green) |
17 | | */ |
18 | | |
19 | | #include "ua_server_internal.h" |
20 | | #include "ua_services.h" |
21 | | #include "ua_subscription.h" |
22 | | |
23 | | #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */ |
24 | | |
25 | | static void |
26 | | setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription, |
27 | | UA_Double requestedPublishingInterval, |
28 | | UA_UInt32 requestedLifetimeCount, |
29 | | UA_UInt32 requestedMaxKeepAliveCount, |
30 | | UA_UInt32 maxNotificationsPerPublish, |
31 | 0 | UA_Byte priority) { |
32 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
33 | | |
34 | | /* re-parameterize the subscription */ |
35 | 0 | UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits, |
36 | 0 | requestedPublishingInterval, |
37 | 0 | subscription->publishingInterval); |
38 | | /* check for nan*/ |
39 | 0 | if(requestedPublishingInterval != requestedPublishingInterval) |
40 | 0 | subscription->publishingInterval = server->config.publishingIntervalLimits.min; |
41 | 0 | UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits, |
42 | 0 | requestedMaxKeepAliveCount, subscription->maxKeepAliveCount); |
43 | 0 | UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits, |
44 | 0 | requestedLifetimeCount, subscription->lifeTimeCount); |
45 | 0 | if(subscription->lifeTimeCount < 3 * subscription->maxKeepAliveCount) |
46 | 0 | subscription->lifeTimeCount = 3 * subscription->maxKeepAliveCount; |
47 | 0 | subscription->notificationsPerPublish = maxNotificationsPerPublish; |
48 | 0 | if(maxNotificationsPerPublish == 0 || |
49 | 0 | maxNotificationsPerPublish > server->config.maxNotificationsPerPublish) |
50 | 0 | subscription->notificationsPerPublish = server->config.maxNotificationsPerPublish; |
51 | 0 | subscription->priority = priority; |
52 | 0 | } |
53 | | |
54 | | UA_Boolean |
55 | | Service_CreateSubscription(UA_Server *server, UA_Session *session, |
56 | | const UA_CreateSubscriptionRequest *request, |
57 | 0 | UA_CreateSubscriptionResponse *response) { |
58 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
59 | | |
60 | | /* Check limits for the number of subscriptions */ |
61 | 0 | if(((server->config.maxSubscriptions != 0) && |
62 | 0 | (server->subscriptionsSize >= server->config.maxSubscriptions)) || |
63 | 0 | ((server->config.maxSubscriptionsPerSession != 0) && |
64 | 0 | (session->subscriptionsSize >= server->config.maxSubscriptionsPerSession))) { |
65 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYSUBSCRIPTIONS; |
66 | 0 | return true; |
67 | 0 | } |
68 | | |
69 | | /* Create the subscription */ |
70 | 0 | UA_Subscription *sub = UA_Subscription_new(); |
71 | 0 | if(!sub) { |
72 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
73 | 0 | "Processing CreateSubscriptionRequest failed"); |
74 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY; |
75 | 0 | return true; |
76 | 0 | } |
77 | | |
78 | | /* Set the subscription parameters */ |
79 | 0 | setSubscriptionSettings(server, sub, request->requestedPublishingInterval, |
80 | 0 | request->requestedLifetimeCount, |
81 | 0 | request->requestedMaxKeepAliveCount, |
82 | 0 | request->maxNotificationsPerPublish, request->priority); |
83 | 0 | sub->subscriptionId = ++server->lastSubscriptionId; /* Assign the SubscriptionId */ |
84 | | |
85 | | /* Register the subscription in the server */ |
86 | 0 | LIST_INSERT_HEAD(&server->subscriptions, sub, serverListEntry); |
87 | 0 | server->subscriptionsSize++; |
88 | | |
89 | | /* Update the server statistics */ |
90 | 0 | server->serverDiagnosticsSummary.currentSubscriptionCount++; |
91 | 0 | server->serverDiagnosticsSummary.cumulatedSubscriptionCount++; |
92 | | |
93 | | /* Attach the Subscription to the session */ |
94 | 0 | UA_Session_attachSubscription(session, sub); |
95 | | |
96 | | /* Create representation in the Session object */ |
97 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
98 | 0 | createSubscriptionObject(server, session, sub); |
99 | 0 | #endif |
100 | | |
101 | | /* Set the subscription state. This also registers the callback. |
102 | | * Note that also a disabled subscription publishes keepalives. */ |
103 | 0 | UA_SubscriptionState sState = (request->publishingEnabled) ? |
104 | 0 | UA_SUBSCRIPTIONSTATE_ENABLED : UA_SUBSCRIPTIONSTATE_ENABLED_NOPUBLISH; |
105 | 0 | UA_StatusCode res = Subscription_setState(server, sub, sState); |
106 | 0 | if(res != UA_STATUSCODE_GOOD) { |
107 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, sub->session, |
108 | 0 | "Subscription %" PRIu32 " | Could not register " |
109 | 0 | "publish callback with error code %s", |
110 | 0 | sub->subscriptionId, UA_StatusCode_name(res)); |
111 | 0 | response->responseHeader.serviceResult = res; |
112 | 0 | UA_Subscription_delete(server, sub); |
113 | 0 | return true; |
114 | 0 | } |
115 | | |
116 | 0 | UA_LOG_INFO_SUBSCRIPTION(server->config.logging, sub, |
117 | 0 | "Subscription created (Publishing interval %.2fms, " |
118 | 0 | "max %lu notifications per publish)", |
119 | 0 | sub->publishingInterval, |
120 | 0 | (long unsigned)sub->notificationsPerPublish); |
121 | | |
122 | | /* Prepare the response */ |
123 | 0 | response->subscriptionId = sub->subscriptionId; |
124 | 0 | response->revisedPublishingInterval = sub->publishingInterval; |
125 | 0 | response->revisedLifetimeCount = sub->lifeTimeCount; |
126 | 0 | response->revisedMaxKeepAliveCount = sub->maxKeepAliveCount; |
127 | |
|
128 | 0 | return true; |
129 | 0 | } |
130 | | |
131 | | UA_Boolean |
132 | | Service_ModifySubscription(UA_Server *server, UA_Session *session, |
133 | | const UA_ModifySubscriptionRequest *request, |
134 | 0 | UA_ModifySubscriptionResponse *response) { |
135 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
136 | 0 | "Processing ModifySubscriptionRequest"); |
137 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
138 | |
|
139 | 0 | UA_Subscription *sub = UA_Session_getSubscriptionById(session, request->subscriptionId); |
140 | 0 | if(!sub) { |
141 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
142 | 0 | return true; |
143 | 0 | } |
144 | | |
145 | | /* Store the old publishing interval */ |
146 | 0 | UA_Double oldPublishingInterval = sub->publishingInterval; |
147 | 0 | UA_Byte oldPriority = sub->priority; |
148 | | |
149 | | /* Change the Subscription settings */ |
150 | 0 | setSubscriptionSettings(server, sub, request->requestedPublishingInterval, |
151 | 0 | request->requestedLifetimeCount, |
152 | 0 | request->requestedMaxKeepAliveCount, |
153 | 0 | request->maxNotificationsPerPublish, request->priority); |
154 | | |
155 | | /* Reset the subscription lifetime */ |
156 | 0 | Subscription_resetLifetime(sub); |
157 | | |
158 | | /* The publish interval has changed */ |
159 | 0 | if(sub->publishingInterval != oldPublishingInterval) { |
160 | | /* Change the repeated callback to the new interval. This cannot fail as |
161 | | * memory is reused. */ |
162 | 0 | if(sub->publishCallbackId > 0) |
163 | 0 | changeRepeatedCallbackInterval(server, sub->publishCallbackId, |
164 | 0 | sub->publishingInterval); |
165 | | |
166 | | /* For each MonitoredItem check if it was/shall be attached to the |
167 | | * publish interval. This ensures that we have less cyclic callbacks |
168 | | * registered and that the notifications are fresh. */ |
169 | 0 | UA_MonitoredItem *mon; |
170 | 0 | LIST_FOREACH(mon, &sub->monitoredItems, listEntry) { |
171 | 0 | if(mon->parameters.samplingInterval == sub->publishingInterval || |
172 | 0 | mon->parameters.samplingInterval == oldPublishingInterval) { |
173 | 0 | UA_MonitoredItem_unregisterSampling(server, mon); |
174 | 0 | UA_MonitoredItem_registerSampling(server, mon); |
175 | 0 | } |
176 | 0 | } |
177 | 0 | } |
178 | | |
179 | | /* If the priority has changed, re-enter the subscription to the |
180 | | * priority-ordered queue in the session. */ |
181 | 0 | if(oldPriority != sub->priority) { |
182 | 0 | UA_Session_detachSubscription(server, session, sub, false); |
183 | 0 | UA_Session_attachSubscription(session, sub); |
184 | 0 | } |
185 | | |
186 | | /* Set the response */ |
187 | 0 | response->revisedPublishingInterval = sub->publishingInterval; |
188 | 0 | response->revisedLifetimeCount = sub->lifeTimeCount; |
189 | 0 | response->revisedMaxKeepAliveCount = sub->maxKeepAliveCount; |
190 | | |
191 | | /* Update the diagnostics statistics */ |
192 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
193 | 0 | sub->modifyCount++; |
194 | 0 | #endif |
195 | |
|
196 | 0 | return true; |
197 | 0 | } |
198 | | |
199 | | static void |
200 | | Operation_SetPublishingMode(UA_Server *server, UA_Session *session, |
201 | | const UA_Boolean *publishingEnabled, |
202 | | const UA_UInt32 *subscriptionId, |
203 | 0 | UA_StatusCode *result) { |
204 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
205 | 0 | UA_Subscription *sub = UA_Session_getSubscriptionById(session, *subscriptionId); |
206 | 0 | if(!sub) { |
207 | 0 | *result = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
208 | 0 | return; |
209 | 0 | } |
210 | | |
211 | | /* Enable/disable */ |
212 | 0 | UA_SubscriptionState sState = (*publishingEnabled) ? |
213 | 0 | UA_SUBSCRIPTIONSTATE_ENABLED : UA_SUBSCRIPTIONSTATE_ENABLED_NOPUBLISH; |
214 | 0 | *result = Subscription_setState(server, sub, sState); |
215 | | |
216 | | /* Reset the lifetime counter */ |
217 | 0 | Subscription_resetLifetime(sub); |
218 | 0 | } |
219 | | |
220 | | UA_Boolean |
221 | | Service_SetPublishingMode(UA_Server *server, UA_Session *session, |
222 | | const UA_SetPublishingModeRequest *request, |
223 | 0 | UA_SetPublishingModeResponse *response) { |
224 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
225 | 0 | "Processing SetPublishingModeRequest"); |
226 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
227 | |
|
228 | 0 | UA_Boolean publishingEnabled = request->publishingEnabled; /* request is const */ |
229 | 0 | response->responseHeader.serviceResult = |
230 | 0 | allocProcessServiceOperations(server, session, |
231 | 0 | (UA_ServiceOperation)Operation_SetPublishingMode, |
232 | 0 | &publishingEnabled, &request->subscriptionIdsSize, |
233 | 0 | &UA_TYPES[UA_TYPES_UINT32], &response->resultsSize, |
234 | 0 | &UA_TYPES[UA_TYPES_STATUSCODE]); |
235 | |
|
236 | 0 | return true; |
237 | 0 | } |
238 | | |
239 | | UA_Boolean |
240 | | Service_Publish(UA_Server *server, UA_Session *session, |
241 | | const UA_PublishRequest *request, |
242 | 0 | UA_PublishResponse *response) { |
243 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
244 | 0 | "Processing PublishRequest with RequestId %u", |
245 | 0 | server->asyncManager.currentRequestId); |
246 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
247 | | |
248 | | /* Return an error if the session has no subscription */ |
249 | 0 | if(TAILQ_EMPTY(&session->subscriptions)) { |
250 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION; |
251 | 0 | return true; |
252 | 0 | } |
253 | | |
254 | | /* Handle too many subscriptions to free resources before trying to allocate |
255 | | * resources for the new publish request. If the limit has been reached the |
256 | | * oldest publish request are returned with an error message. */ |
257 | 0 | UA_Session_ensurePublishQueueSpace(server, session); |
258 | | |
259 | | /* Allocate the response to store it in the retransmission queue */ |
260 | 0 | UA_PublishResponseEntry *entry = (UA_PublishResponseEntry *) |
261 | 0 | UA_malloc(sizeof(UA_PublishResponseEntry)); |
262 | 0 | if(!entry) { |
263 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY; |
264 | 0 | return true; |
265 | 0 | } |
266 | | |
267 | | /* Prepare the response */ |
268 | 0 | UA_PublishResponse *entry_response = &entry->response; |
269 | 0 | UA_PublishResponse_init(entry_response); |
270 | | |
271 | | /* Allocate the results array to acknowledge the acknowledge */ |
272 | 0 | if(request->subscriptionAcknowledgementsSize > 0) { |
273 | 0 | entry_response->results = (UA_StatusCode *) |
274 | 0 | UA_Array_new(request->subscriptionAcknowledgementsSize, |
275 | 0 | &UA_TYPES[UA_TYPES_STATUSCODE]); |
276 | 0 | if(!entry_response->results) { |
277 | 0 | UA_free(entry); |
278 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY; |
279 | 0 | return true; |
280 | 0 | } |
281 | 0 | entry_response->resultsSize = request->subscriptionAcknowledgementsSize; |
282 | 0 | } |
283 | | |
284 | | /* <--- Async response from here on ---> */ |
285 | | |
286 | 0 | entry->requestId = server->asyncManager.currentRequestId; |
287 | 0 | entry_response->responseHeader.requestHandle = request->requestHeader.requestHandle; |
288 | | |
289 | | /* Delete Acknowledged Subscription Messages */ |
290 | 0 | for(size_t i = 0; i < request->subscriptionAcknowledgementsSize; ++i) { |
291 | 0 | UA_SubscriptionAcknowledgement *ack = &request->subscriptionAcknowledgements[i]; |
292 | 0 | UA_Subscription *sub = UA_Session_getSubscriptionById(session, ack->subscriptionId); |
293 | 0 | if(!sub) { |
294 | 0 | entry_response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
295 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
296 | 0 | "Cannot process acknowledgements subscription %u" PRIu32, |
297 | 0 | ack->subscriptionId); |
298 | 0 | continue; |
299 | 0 | } |
300 | | /* Remove the acked transmission from the retransmission queue */ |
301 | 0 | entry_response->results[i] = |
302 | 0 | UA_Subscription_removeRetransmissionMessage(sub, ack->sequenceNumber); |
303 | 0 | } |
304 | | |
305 | | /* Set the maxTime if a timeout hint is defined */ |
306 | 0 | entry->maxTime = UA_INT64_MAX; |
307 | 0 | if(request->requestHeader.timeoutHint > 0) { |
308 | 0 | UA_EventLoop *el = server->config.eventLoop; |
309 | 0 | entry->maxTime = el->dateTime_nowMonotonic(el) + |
310 | 0 | (request->requestHeader.timeoutHint * UA_DATETIME_MSEC); |
311 | 0 | } |
312 | | |
313 | | /* Queue the publish response. It will be dequeued in a repeated publish |
314 | | * callback. This can also be triggered right now for a late |
315 | | * subscription. */ |
316 | 0 | UA_Session_queuePublishReq(session, entry, false); |
317 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, "Queued a publication message"); |
318 | | |
319 | | /* If there are late subscriptions, the new publish request is used to |
320 | | * answer them immediately. Late subscriptions with higher priority are |
321 | | * considered earlier. However, a single subscription that generates many |
322 | | * notifications must not "starve" other late subscriptions. Hence we move |
323 | | * it to the end of the queue for the subscriptions of that priority. */ |
324 | 0 | UA_Subscription *late, *late_tmp; |
325 | 0 | TAILQ_FOREACH_SAFE(late, &session->subscriptions, sessionListEntry, late_tmp) { |
326 | | /* Skip non-late subscriptions */ |
327 | 0 | if(!late->late) |
328 | 0 | continue; |
329 | | |
330 | | /* Call publish on the late subscription */ |
331 | 0 | UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, late, |
332 | 0 | "Send PublishResponse on a late subscription"); |
333 | 0 | UA_Subscription_publish(server, late); |
334 | | |
335 | | /* Skip re-insert if the subscription was deleted or deactivated during |
336 | | * _publish */ |
337 | 0 | if(late->state >= UA_SUBSCRIPTIONSTATE_ENABLED_NOPUBLISH) { |
338 | | /* Find the first element with smaller priority and insert before |
339 | | * that. If there is none, insert at the end of the queue. */ |
340 | 0 | UA_Subscription *after = TAILQ_NEXT(late, sessionListEntry); |
341 | 0 | while(after && after->priority >= late->priority) |
342 | 0 | after = TAILQ_NEXT(after, sessionListEntry); |
343 | 0 | TAILQ_REMOVE(&session->subscriptions, late, sessionListEntry); |
344 | 0 | if(after) |
345 | 0 | TAILQ_INSERT_BEFORE(after, late, sessionListEntry); |
346 | 0 | else |
347 | 0 | TAILQ_INSERT_TAIL(&session->subscriptions, late, sessionListEntry); |
348 | 0 | } |
349 | | |
350 | | /* Responses left in the queue? */ |
351 | 0 | if(session->responseQueueSize == 0) |
352 | 0 | break; |
353 | 0 | } |
354 | |
|
355 | 0 | return false; |
356 | 0 | } |
357 | | |
358 | | static void |
359 | | Operation_DeleteSubscription(UA_Server *server, UA_Session *session, void *_, |
360 | 0 | const UA_UInt32 *subscriptionId, UA_StatusCode *result) { |
361 | 0 | UA_Subscription *sub = UA_Session_getSubscriptionById(session, *subscriptionId); |
362 | 0 | if(!sub) { |
363 | 0 | *result = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
364 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
365 | 0 | "Deleting Subscription with Id %" PRIu32 |
366 | 0 | " failed with error code %s", |
367 | 0 | *subscriptionId, UA_StatusCode_name(*result)); |
368 | 0 | return; |
369 | 0 | } |
370 | | |
371 | 0 | UA_Subscription_delete(server, sub); |
372 | 0 | *result = UA_STATUSCODE_GOOD; |
373 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
374 | 0 | "Subscription %" PRIu32 " | Subscription deleted", |
375 | 0 | *subscriptionId); |
376 | 0 | } |
377 | | |
378 | | UA_Boolean |
379 | | Service_DeleteSubscriptions(UA_Server *server, UA_Session *session, |
380 | | const UA_DeleteSubscriptionsRequest *request, |
381 | 0 | UA_DeleteSubscriptionsResponse *response) { |
382 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
383 | 0 | "Processing DeleteSubscriptionsRequest"); |
384 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
385 | |
|
386 | 0 | response->responseHeader.serviceResult = |
387 | 0 | allocProcessServiceOperations(server, session, |
388 | 0 | (UA_ServiceOperation)Operation_DeleteSubscription, |
389 | 0 | NULL, &request->subscriptionIdsSize, |
390 | 0 | &UA_TYPES[UA_TYPES_UINT32], &response->resultsSize, |
391 | 0 | &UA_TYPES[UA_TYPES_STATUSCODE]); |
392 | 0 | return true; |
393 | 0 | } |
394 | | |
395 | | UA_Boolean |
396 | | Service_Republish(UA_Server *server, UA_Session *session, |
397 | | const UA_RepublishRequest *request, |
398 | 0 | UA_RepublishResponse *response) { |
399 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
400 | 0 | "Processing RepublishRequest"); |
401 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
402 | | |
403 | | /* Get the subscription */ |
404 | 0 | UA_Subscription *sub = UA_Session_getSubscriptionById(session, request->subscriptionId); |
405 | 0 | if(!sub) { |
406 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
407 | 0 | return true; |
408 | 0 | } |
409 | | |
410 | | /* Reset the lifetime counter */ |
411 | 0 | Subscription_resetLifetime(sub); |
412 | | |
413 | | /* Update the subscription statistics */ |
414 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
415 | 0 | sub->republishRequestCount++; |
416 | 0 | #endif |
417 | | |
418 | | /* Find the notification in the retransmission queue */ |
419 | 0 | UA_NotificationMessageEntry *entry; |
420 | 0 | TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) { |
421 | 0 | if(entry->message.sequenceNumber == request->retransmitSequenceNumber) |
422 | 0 | break; |
423 | 0 | } |
424 | 0 | if(!entry) { |
425 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADMESSAGENOTAVAILABLE; |
426 | 0 | return true; |
427 | 0 | } |
428 | | |
429 | 0 | response->responseHeader.serviceResult = |
430 | 0 | UA_NotificationMessage_copy(&entry->message, &response->notificationMessage); |
431 | | |
432 | | /* Update the subscription statistics for the case where we return a message */ |
433 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
434 | 0 | sub->republishMessageCount++; |
435 | 0 | #endif |
436 | |
|
437 | 0 | return true; |
438 | 0 | } |
439 | | |
440 | | static UA_StatusCode |
441 | 0 | setTransferredSequenceNumbers(const UA_Subscription *sub, UA_TransferResult *result) { |
442 | | /* Allocate memory */ |
443 | 0 | result->availableSequenceNumbers = (UA_UInt32*) |
444 | 0 | UA_Array_new(sub->retransmissionQueueSize, &UA_TYPES[UA_TYPES_UINT32]); |
445 | 0 | if(!result->availableSequenceNumbers) |
446 | 0 | return UA_STATUSCODE_BADOUTOFMEMORY; |
447 | 0 | result->availableSequenceNumbersSize = sub->retransmissionQueueSize; |
448 | | |
449 | | /* Copy over the sequence numbers */ |
450 | 0 | UA_NotificationMessageEntry *entry; |
451 | 0 | size_t i = 0; |
452 | 0 | TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) { |
453 | 0 | result->availableSequenceNumbers[i] = entry->message.sequenceNumber; |
454 | 0 | i++; |
455 | 0 | } |
456 | |
|
457 | 0 | UA_assert(i == result->availableSequenceNumbersSize); |
458 | | |
459 | 0 | return UA_STATUSCODE_GOOD; |
460 | 0 | } |
461 | | |
462 | | static void |
463 | | Operation_TransferSubscription(UA_Server *server, UA_Session *session, |
464 | | const UA_Boolean *sendInitialValues, |
465 | | const UA_UInt32 *subscriptionId, |
466 | 0 | UA_TransferResult *result) { |
467 | | /* Get the subscription. This requires a server-wide lookup instead of the |
468 | | * usual session-wide lookup. */ |
469 | 0 | UA_Subscription *sub = getSubscriptionById(server, *subscriptionId); |
470 | 0 | if(!sub) { |
471 | 0 | result->statusCode = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
472 | 0 | return; |
473 | 0 | } |
474 | | |
475 | | /* Update the diagnostics statistics */ |
476 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
477 | 0 | sub->transferRequestCount++; |
478 | 0 | #endif |
479 | | |
480 | | /* Is this the same session? Return the sequence numbers and do nothing else. */ |
481 | 0 | UA_Session *oldSession = sub->session; |
482 | 0 | if(oldSession == session) { |
483 | 0 | result->statusCode = setTransferredSequenceNumbers(sub, result); |
484 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
485 | 0 | sub->transferredToSameClientCount++; |
486 | 0 | #endif |
487 | 0 | return; |
488 | 0 | } |
489 | | |
490 | | /* Check with AccessControl if the transfer is allowed */ |
491 | 0 | if(server->config.accessControl.allowTransferSubscription) { |
492 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
493 | 0 | if(!server->config.accessControl. |
494 | 0 | allowTransferSubscription(server, &server->config.accessControl, |
495 | 0 | oldSession ? &oldSession->sessionId : NULL, |
496 | 0 | oldSession ? oldSession->context : NULL, |
497 | 0 | &session->sessionId, session->context)) { |
498 | 0 | result->statusCode = UA_STATUSCODE_BADUSERACCESSDENIED; |
499 | 0 | return; |
500 | 0 | } |
501 | 0 | } else { |
502 | 0 | result->statusCode = UA_STATUSCODE_BADUSERACCESSDENIED; |
503 | 0 | return; |
504 | 0 | } |
505 | | |
506 | | /* Check limits for the number of subscriptions for this Session */ |
507 | 0 | if((server->config.maxSubscriptionsPerSession != 0) && |
508 | 0 | (session->subscriptionsSize >= server->config.maxSubscriptionsPerSession)) { |
509 | 0 | result->statusCode = UA_STATUSCODE_BADTOOMANYSUBSCRIPTIONS; |
510 | 0 | return; |
511 | 0 | } |
512 | | |
513 | | /* Allocate memory for the new subscription */ |
514 | 0 | UA_Subscription *newSub = (UA_Subscription*)UA_malloc(sizeof(UA_Subscription)); |
515 | 0 | if(!newSub) { |
516 | 0 | result->statusCode = UA_STATUSCODE_BADOUTOFMEMORY; |
517 | 0 | return; |
518 | 0 | } |
519 | | |
520 | | /* Set the available sequence numbers */ |
521 | 0 | result->statusCode = setTransferredSequenceNumbers(sub, result); |
522 | 0 | if(result->statusCode != UA_STATUSCODE_GOOD) { |
523 | 0 | UA_free(newSub); |
524 | 0 | return; |
525 | 0 | } |
526 | | |
527 | | /* Create an identical copy of the Subscription struct. The original |
528 | | * subscription remains in place until a StatusChange notification has been |
529 | | * sent. The elements for lists and queues are moved over manually to ensure |
530 | | * that all backpointers are set correctly. */ |
531 | 0 | memcpy(newSub, sub, sizeof(UA_Subscription)); |
532 | | |
533 | | /* Set to the same state as the original subscription */ |
534 | 0 | newSub->publishCallbackId = 0; |
535 | 0 | result->statusCode = Subscription_setState(server, newSub, sub->state); |
536 | 0 | if(result->statusCode != UA_STATUSCODE_GOOD) { |
537 | 0 | UA_Array_delete(result->availableSequenceNumbers, |
538 | 0 | sub->retransmissionQueueSize, &UA_TYPES[UA_TYPES_UINT32]); |
539 | 0 | result->availableSequenceNumbers = NULL; |
540 | 0 | result->availableSequenceNumbersSize = 0; |
541 | 0 | UA_free(newSub); |
542 | 0 | return; |
543 | 0 | } |
544 | | |
545 | | /* <-- The point of no return --> */ |
546 | | |
547 | | /* Move over the MonitoredItems and adjust the backpointers */ |
548 | 0 | LIST_INIT(&newSub->monitoredItems); |
549 | 0 | UA_MonitoredItem *mon, *mon_tmp; |
550 | 0 | LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp) { |
551 | 0 | LIST_REMOVE(mon, listEntry); |
552 | 0 | mon->subscription = newSub; |
553 | 0 | LIST_INSERT_HEAD(&newSub->monitoredItems, mon, listEntry); |
554 | 0 | } |
555 | 0 | sub->monitoredItemsSize = 0; |
556 | | |
557 | | /* Move over the notification queue */ |
558 | 0 | TAILQ_INIT(&newSub->notificationQueue); |
559 | 0 | UA_Notification *nn, *nn_tmp; |
560 | 0 | TAILQ_FOREACH_SAFE(nn, &sub->notificationQueue, subEntry, nn_tmp) { |
561 | 0 | TAILQ_REMOVE(&sub->notificationQueue, nn, subEntry); |
562 | 0 | TAILQ_INSERT_TAIL(&newSub->notificationQueue, nn, subEntry); |
563 | 0 | } |
564 | 0 | sub->notificationQueueSize = 0; |
565 | 0 | sub->dataChangeNotifications = 0; |
566 | 0 | sub->eventNotifications = 0; |
567 | |
|
568 | 0 | TAILQ_INIT(&newSub->retransmissionQueue); |
569 | 0 | UA_NotificationMessageEntry *nme, *nme_tmp; |
570 | 0 | TAILQ_FOREACH_SAFE(nme, &sub->retransmissionQueue, listEntry, nme_tmp) { |
571 | 0 | TAILQ_REMOVE(&sub->retransmissionQueue, nme, listEntry); |
572 | 0 | TAILQ_INSERT_TAIL(&newSub->retransmissionQueue, nme, listEntry); |
573 | 0 | if(oldSession) |
574 | 0 | oldSession->totalRetransmissionQueueSize -= 1; |
575 | 0 | sub->retransmissionQueueSize -= 1; |
576 | 0 | } |
577 | 0 | UA_assert(sub->retransmissionQueueSize == 0); |
578 | 0 | sub->retransmissionQueueSize = 0; |
579 | | |
580 | | /* Add to the server */ |
581 | 0 | UA_assert(newSub->subscriptionId == sub->subscriptionId); |
582 | 0 | LIST_INSERT_HEAD(&server->subscriptions, newSub, serverListEntry); |
583 | 0 | server->subscriptionsSize++; |
584 | | |
585 | | /* Attach to the session */ |
586 | 0 | UA_Session_attachSubscription(session, newSub); |
587 | |
|
588 | 0 | UA_LOG_INFO_SUBSCRIPTION(server->config.logging, newSub, "Transferred to this Session"); |
589 | | |
590 | | /* Set StatusChange in the original subscription and force publish. This |
591 | | * also removes the Subscription, even if there was no PublishResponse |
592 | | * queued to send a StatusChangeNotification. */ |
593 | 0 | sub->statusChange = UA_STATUSCODE_GOODSUBSCRIPTIONTRANSFERRED; |
594 | 0 | UA_Subscription_publish(server, sub); |
595 | | |
596 | | /* Re-create notifications with the current values for the new subscription */ |
597 | 0 | if(*sendInitialValues) |
598 | 0 | UA_Subscription_resendData(server, newSub); |
599 | | |
600 | | /* Do not update the statistics for the number of Subscriptions here. The |
601 | | * fact that we duplicate the subscription and move over the content is just |
602 | | * an implementtion detail. |
603 | | * server->serverDiagnosticsSummary.currentSubscriptionCount++; |
604 | | * server->serverDiagnosticsSummary.cumulatedSubscriptionCount++; |
605 | | * |
606 | | * Update the diagnostics statistics: */ |
607 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
608 | 0 | if(oldSession && |
609 | 0 | UA_equal(&oldSession->clientDescription, &session->clientDescription, |
610 | 0 | &UA_TYPES[UA_TYPES_APPLICATIONDESCRIPTION])) |
611 | 0 | sub->transferredToSameClientCount++; |
612 | 0 | else |
613 | 0 | sub->transferredToAltClientCount++; |
614 | 0 | #endif |
615 | 0 | } |
616 | | |
617 | | UA_Boolean |
618 | | Service_TransferSubscriptions(UA_Server *server, UA_Session *session, |
619 | | const UA_TransferSubscriptionsRequest *request, |
620 | 0 | UA_TransferSubscriptionsResponse *response) { |
621 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
622 | 0 | "Processing TransferSubscriptionsRequest"); |
623 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
624 | |
|
625 | 0 | response->responseHeader.serviceResult = |
626 | 0 | allocProcessServiceOperations(server, session, |
627 | 0 | (UA_ServiceOperation)Operation_TransferSubscription, |
628 | 0 | &request->sendInitialValues, |
629 | 0 | &request->subscriptionIdsSize, |
630 | 0 | &UA_TYPES[UA_TYPES_UINT32], |
631 | 0 | &response->resultsSize, |
632 | 0 | &UA_TYPES[UA_TYPES_TRANSFERRESULT]); |
633 | 0 | return true; |
634 | 0 | } |
635 | | |
636 | | #endif /* UA_ENABLE_SUBSCRIPTIONS */ |