/src/open62541/src/server/ua_services_subscription.c
Line | Count | Source |
1 | | /* This Source Code Form is subject to the terms of the Mozilla Public |
2 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
3 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
4 | | * |
5 | | * Copyright 2014-2018, 2022 (c) Fraunhofer IOSB (Author: Julius Pfrommer) |
6 | | * Copyright 2016-2017 (c) Florian Palm |
7 | | * Copyright 2015 (c) Chris Iatrou |
8 | | * Copyright 2015-2016 (c) Sten GrĂ¼ner |
9 | | * Copyright 2015-2016 (c) Oleksiy Vasylyev |
10 | | * Copyright 2017 (c) Stefan Profanter, fortiss GmbH |
11 | | * Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH |
12 | | * Copyright 2017 (c) Mattias Bornhager |
13 | | * Copyright 2017 (c) Henrik Norrman |
14 | | * Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA |
15 | | * Copyright 2018 (c) Fabian Arndt, Root-Core |
16 | | * Copyright 2017-2019 (c) HMS Industrial Networks AB (Author: Jonas Green) |
17 | | */ |
18 | | |
19 | | #include "ua_server_internal.h" |
20 | | #include "ua_services.h" |
21 | | #include "ua_subscription.h" |
22 | | |
23 | | #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */ |
24 | | |
25 | | static void |
26 | | setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription, |
27 | | UA_Double requestedPublishingInterval, |
28 | | UA_UInt32 requestedLifetimeCount, |
29 | | UA_UInt32 requestedMaxKeepAliveCount, |
30 | | UA_UInt32 maxNotificationsPerPublish, |
31 | 0 | UA_Byte priority) { |
32 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
33 | | |
34 | | /* re-parameterize the subscription */ |
35 | 0 | UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits, |
36 | 0 | requestedPublishingInterval, |
37 | 0 | subscription->publishingInterval); |
38 | | /* check for nan*/ |
39 | 0 | if(requestedPublishingInterval != requestedPublishingInterval) |
40 | 0 | subscription->publishingInterval = server->config.publishingIntervalLimits.min; |
41 | 0 | UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits, |
42 | 0 | requestedMaxKeepAliveCount, subscription->maxKeepAliveCount); |
43 | 0 | UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits, |
44 | 0 | requestedLifetimeCount, subscription->lifeTimeCount); |
45 | 0 | if(subscription->lifeTimeCount < 3 * subscription->maxKeepAliveCount) |
46 | 0 | subscription->lifeTimeCount = 3 * subscription->maxKeepAliveCount; |
47 | 0 | subscription->notificationsPerPublish = maxNotificationsPerPublish; |
48 | 0 | if(maxNotificationsPerPublish == 0 || |
49 | 0 | maxNotificationsPerPublish > server->config.maxNotificationsPerPublish) |
50 | 0 | subscription->notificationsPerPublish = server->config.maxNotificationsPerPublish; |
51 | 0 | subscription->priority = priority; |
52 | 0 | } |
53 | | |
54 | | static void |
55 | | notifySubscription(UA_Server *server, UA_Subscription *sub, |
56 | 0 | UA_ApplicationNotificationType type) { |
57 | 0 | if(!server->config.subscriptionNotificationCallback && |
58 | 0 | !server->config.globalNotificationCallback) |
59 | 0 | return; |
60 | | |
61 | 0 | static UA_THREAD_LOCAL UA_KeyValuePair createSubData[8] = { |
62 | 0 | {{0, UA_STRING_STATIC("session-id")}, {0}}, |
63 | 0 | {{0, UA_STRING_STATIC("subscription-id")}, {0}}, |
64 | 0 | {{0, UA_STRING_STATIC("publishing-interval")}, {0}}, |
65 | 0 | {{0, UA_STRING_STATIC("lifetime-count")}, {0}}, |
66 | 0 | {{0, UA_STRING_STATIC("max-keepalive-count")}, {0}}, |
67 | 0 | {{0, UA_STRING_STATIC("max-notifications-per-publish")}, {0}}, |
68 | 0 | {{0, UA_STRING_STATIC("priority")}, {0}}, |
69 | 0 | {{0, UA_STRING_STATIC("publishing-enabled")}, {0}} |
70 | 0 | }; |
71 | 0 | UA_KeyValueMap createSubMap = {8, createSubData}; |
72 | |
|
73 | 0 | UA_NodeId sessionId = (sub->session) ? sub->session->sessionId : UA_NODEID_NULL; |
74 | 0 | UA_Boolean enabled = (sub->state = UA_SUBSCRIPTIONSTATE_ENABLED); |
75 | |
|
76 | 0 | UA_Variant_setScalar(&createSubData[0].value, &sessionId, |
77 | 0 | &UA_TYPES[UA_TYPES_NODEID]); |
78 | 0 | UA_Variant_setScalar(&createSubData[1].value, &sub->subscriptionId, |
79 | 0 | &UA_TYPES[UA_TYPES_UINT32]); |
80 | 0 | UA_Variant_setScalar(&createSubData[2].value, &sub->publishingInterval, |
81 | 0 | &UA_TYPES[UA_TYPES_DOUBLE]); |
82 | 0 | UA_Variant_setScalar(&createSubData[3].value, &sub->lifeTimeCount, |
83 | 0 | &UA_TYPES[UA_TYPES_UINT32]); |
84 | 0 | UA_Variant_setScalar(&createSubData[4].value, &sub->maxKeepAliveCount, |
85 | 0 | &UA_TYPES[UA_TYPES_UINT32]); |
86 | 0 | UA_Variant_setScalar(&createSubData[5].value, &sub->notificationsPerPublish, |
87 | 0 | &UA_TYPES[UA_TYPES_UINT32]); |
88 | 0 | UA_Variant_setScalar(&createSubData[6].value, &sub->priority, |
89 | 0 | &UA_TYPES[UA_TYPES_BYTE]); |
90 | 0 | UA_Variant_setScalar(&createSubData[7].value, &enabled, |
91 | 0 | &UA_TYPES[UA_TYPES_BOOLEAN]); |
92 | |
|
93 | 0 | if(server->config.subscriptionNotificationCallback) |
94 | 0 | server->config.subscriptionNotificationCallback(server, type, createSubMap); |
95 | 0 | if(server->config.globalNotificationCallback) |
96 | 0 | server->config.globalNotificationCallback(server, type, createSubMap); |
97 | 0 | } |
98 | | |
99 | | UA_Boolean |
100 | | Service_CreateSubscription(UA_Server *server, UA_Session *session, |
101 | | const UA_CreateSubscriptionRequest *request, |
102 | 0 | UA_CreateSubscriptionResponse *response) { |
103 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
104 | | |
105 | | /* Check limits for the number of subscriptions */ |
106 | 0 | if(((server->config.maxSubscriptions != 0) && |
107 | 0 | (server->subscriptionsSize >= server->config.maxSubscriptions)) || |
108 | 0 | ((server->config.maxSubscriptionsPerSession != 0) && |
109 | 0 | (session->subscriptionsSize >= server->config.maxSubscriptionsPerSession))) { |
110 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYSUBSCRIPTIONS; |
111 | 0 | return true; |
112 | 0 | } |
113 | | |
114 | | /* Create the subscription */ |
115 | 0 | UA_Subscription *sub = UA_Subscription_new(); |
116 | 0 | if(!sub) { |
117 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
118 | 0 | "Processing CreateSubscriptionRequest failed"); |
119 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY; |
120 | 0 | return true; |
121 | 0 | } |
122 | | |
123 | | /* Set the subscription parameters */ |
124 | 0 | setSubscriptionSettings(server, sub, request->requestedPublishingInterval, |
125 | 0 | request->requestedLifetimeCount, |
126 | 0 | request->requestedMaxKeepAliveCount, |
127 | 0 | request->maxNotificationsPerPublish, request->priority); |
128 | 0 | sub->subscriptionId = ++server->lastSubscriptionId; /* Assign the SubscriptionId */ |
129 | | |
130 | | /* Register the subscription in the server */ |
131 | 0 | LIST_INSERT_HEAD(&server->subscriptions, sub, serverListEntry); |
132 | 0 | server->subscriptionsSize++; |
133 | | |
134 | | /* Update the server statistics */ |
135 | 0 | server->serverDiagnosticsSummary.currentSubscriptionCount++; |
136 | 0 | server->serverDiagnosticsSummary.cumulatedSubscriptionCount++; |
137 | | |
138 | | /* Attach the Subscription to the session */ |
139 | 0 | UA_Session_attachSubscription(session, sub); |
140 | | |
141 | | /* Create representation in the Session object */ |
142 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
143 | 0 | createSubscriptionObject(server, session, sub); |
144 | 0 | #endif |
145 | | |
146 | | /* Set the subscription state. This also registers the callback. |
147 | | * Note that also a disabled subscription publishes keepalives. */ |
148 | 0 | UA_SubscriptionState sState = (request->publishingEnabled) ? |
149 | 0 | UA_SUBSCRIPTIONSTATE_ENABLED : UA_SUBSCRIPTIONSTATE_ENABLED_NOPUBLISH; |
150 | 0 | UA_StatusCode res = Subscription_setState(server, sub, sState); |
151 | 0 | if(res != UA_STATUSCODE_GOOD) { |
152 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, sub->session, |
153 | 0 | "Subscription %" PRIu32 " | Could not register " |
154 | 0 | "publish callback with error code %s", |
155 | 0 | sub->subscriptionId, UA_StatusCode_name(res)); |
156 | 0 | response->responseHeader.serviceResult = res; |
157 | 0 | UA_Subscription_delete(server, sub); |
158 | 0 | return true; |
159 | 0 | } |
160 | | |
161 | 0 | UA_LOG_INFO_SUBSCRIPTION(server->config.logging, sub, |
162 | 0 | "Subscription created (Publishing interval %.2fms, " |
163 | 0 | "max %lu notifications per publish)", |
164 | 0 | sub->publishingInterval, |
165 | 0 | (long unsigned)sub->notificationsPerPublish); |
166 | | |
167 | | /* Notify the application */ |
168 | 0 | notifySubscription(server, sub, |
169 | 0 | UA_APPLICATIONNOTIFICATIONTYPE_SUBSCRIPTION_CREATED); |
170 | | |
171 | | /* Prepare the response */ |
172 | 0 | response->subscriptionId = sub->subscriptionId; |
173 | 0 | response->revisedPublishingInterval = sub->publishingInterval; |
174 | 0 | response->revisedLifetimeCount = sub->lifeTimeCount; |
175 | 0 | response->revisedMaxKeepAliveCount = sub->maxKeepAliveCount; |
176 | |
|
177 | 0 | return true; |
178 | 0 | } |
179 | | |
180 | | UA_Boolean |
181 | | Service_ModifySubscription(UA_Server *server, UA_Session *session, |
182 | | const UA_ModifySubscriptionRequest *request, |
183 | 0 | UA_ModifySubscriptionResponse *response) { |
184 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
185 | 0 | "Processing ModifySubscriptionRequest"); |
186 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
187 | |
|
188 | 0 | UA_Subscription *sub = UA_Session_getSubscriptionById(session, request->subscriptionId); |
189 | 0 | if(!sub) { |
190 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
191 | 0 | return true; |
192 | 0 | } |
193 | | |
194 | | /* Store the old publishing interval */ |
195 | 0 | UA_Double oldPublishingInterval = sub->publishingInterval; |
196 | 0 | UA_Byte oldPriority = sub->priority; |
197 | | |
198 | | /* Change the Subscription settings */ |
199 | 0 | setSubscriptionSettings(server, sub, request->requestedPublishingInterval, |
200 | 0 | request->requestedLifetimeCount, |
201 | 0 | request->requestedMaxKeepAliveCount, |
202 | 0 | request->maxNotificationsPerPublish, request->priority); |
203 | | |
204 | | /* Reset the subscription lifetime */ |
205 | 0 | Subscription_resetLifetime(sub); |
206 | | |
207 | | /* The publish interval has changed */ |
208 | 0 | if(sub->publishingInterval != oldPublishingInterval) { |
209 | | /* Change the repeated callback to the new interval. This cannot fail as |
210 | | * memory is reused. */ |
211 | 0 | if(sub->publishCallbackId > 0) |
212 | 0 | changeRepeatedCallbackInterval(server, sub->publishCallbackId, |
213 | 0 | sub->publishingInterval); |
214 | | |
215 | | /* For each MonitoredItem check if it was/shall be attached to the |
216 | | * publish interval. This ensures that we have less cyclic callbacks |
217 | | * registered and that the notifications are fresh. */ |
218 | 0 | UA_MonitoredItem *mon; |
219 | 0 | LIST_FOREACH(mon, &sub->monitoredItems, listEntry) { |
220 | 0 | if(mon->parameters.samplingInterval == sub->publishingInterval || |
221 | 0 | mon->parameters.samplingInterval == oldPublishingInterval) { |
222 | 0 | UA_MonitoredItem_unregisterSampling(server, mon); |
223 | 0 | UA_MonitoredItem_registerSampling(server, mon); |
224 | 0 | } |
225 | 0 | } |
226 | 0 | } |
227 | | |
228 | | /* If the priority has changed, re-enter the subscription to the |
229 | | * priority-ordered queue in the session. */ |
230 | 0 | if(oldPriority != sub->priority) { |
231 | 0 | UA_Session_detachSubscription(server, session, sub, false); |
232 | 0 | UA_Session_attachSubscription(session, sub); |
233 | 0 | } |
234 | | |
235 | | /* Notify the application */ |
236 | 0 | notifySubscription(server, sub, |
237 | 0 | UA_APPLICATIONNOTIFICATIONTYPE_SUBSCRIPTION_MODIFIED); |
238 | | |
239 | | /* Set the response */ |
240 | 0 | response->revisedPublishingInterval = sub->publishingInterval; |
241 | 0 | response->revisedLifetimeCount = sub->lifeTimeCount; |
242 | 0 | response->revisedMaxKeepAliveCount = sub->maxKeepAliveCount; |
243 | | |
244 | | /* Update the diagnostics statistics */ |
245 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
246 | 0 | sub->modifyCount++; |
247 | 0 | #endif |
248 | |
|
249 | 0 | return true; |
250 | 0 | } |
251 | | |
252 | | static void |
253 | | Operation_SetPublishingMode(UA_Server *server, UA_Session *session, |
254 | | const UA_Boolean *publishingEnabled, |
255 | | const UA_UInt32 *subscriptionId, |
256 | 0 | UA_StatusCode *result) { |
257 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
258 | 0 | UA_Subscription *sub = UA_Session_getSubscriptionById(session, *subscriptionId); |
259 | 0 | if(!sub) { |
260 | 0 | *result = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
261 | 0 | return; |
262 | 0 | } |
263 | | |
264 | | /* Enable/disable */ |
265 | 0 | UA_SubscriptionState sState = (*publishingEnabled) ? |
266 | 0 | UA_SUBSCRIPTIONSTATE_ENABLED : UA_SUBSCRIPTIONSTATE_ENABLED_NOPUBLISH; |
267 | 0 | *result = Subscription_setState(server, sub, sState); |
268 | | |
269 | | /* Reset the lifetime counter */ |
270 | 0 | Subscription_resetLifetime(sub); |
271 | | |
272 | | /* Notify the application */ |
273 | 0 | notifySubscription(server, sub, |
274 | 0 | UA_APPLICATIONNOTIFICATIONTYPE_SUBSCRIPTION_PUBLISHINGMODE); |
275 | 0 | } |
276 | | |
277 | | UA_Boolean |
278 | | Service_SetPublishingMode(UA_Server *server, UA_Session *session, |
279 | | const UA_SetPublishingModeRequest *request, |
280 | 0 | UA_SetPublishingModeResponse *response) { |
281 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
282 | 0 | "Processing SetPublishingModeRequest"); |
283 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
284 | |
|
285 | 0 | UA_Boolean publishingEnabled = request->publishingEnabled; /* request is const */ |
286 | 0 | response->responseHeader.serviceResult = |
287 | 0 | allocProcessServiceOperations(server, session, |
288 | 0 | (UA_ServiceOperation)Operation_SetPublishingMode, |
289 | 0 | &publishingEnabled, &request->subscriptionIdsSize, |
290 | 0 | &UA_TYPES[UA_TYPES_UINT32], &response->resultsSize, |
291 | 0 | &UA_TYPES[UA_TYPES_STATUSCODE]); |
292 | |
|
293 | 0 | return true; |
294 | 0 | } |
295 | | |
296 | | UA_Boolean |
297 | | Service_Publish(UA_Server *server, UA_Session *session, |
298 | | const UA_PublishRequest *request, |
299 | 0 | UA_PublishResponse *response) { |
300 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
301 | 0 | "Processing PublishRequest with RequestId %u", |
302 | 0 | server->asyncManager.currentRequestId); |
303 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
304 | | |
305 | | /* Return an error if the session has no subscription */ |
306 | 0 | if(TAILQ_EMPTY(&session->subscriptions)) { |
307 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION; |
308 | 0 | return true; |
309 | 0 | } |
310 | | |
311 | | /* Handle too many subscriptions to free resources before trying to allocate |
312 | | * resources for the new publish request. If the limit has been reached the |
313 | | * oldest publish request are returned with an error message. */ |
314 | 0 | UA_Session_ensurePublishQueueSpace(server, session); |
315 | | |
316 | | /* Allocate the response to store it in the retransmission queue */ |
317 | 0 | UA_PublishResponseEntry *entry = (UA_PublishResponseEntry *) |
318 | 0 | UA_malloc(sizeof(UA_PublishResponseEntry)); |
319 | 0 | if(!entry) { |
320 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY; |
321 | 0 | return true; |
322 | 0 | } |
323 | | |
324 | | /* Prepare the response */ |
325 | 0 | UA_PublishResponse *entry_response = &entry->response; |
326 | 0 | UA_PublishResponse_init(entry_response); |
327 | | |
328 | | /* Allocate the results array to acknowledge the acknowledge */ |
329 | 0 | if(request->subscriptionAcknowledgementsSize > 0) { |
330 | 0 | entry_response->results = (UA_StatusCode *) |
331 | 0 | UA_Array_new(request->subscriptionAcknowledgementsSize, |
332 | 0 | &UA_TYPES[UA_TYPES_STATUSCODE]); |
333 | 0 | if(!entry_response->results) { |
334 | 0 | UA_free(entry); |
335 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY; |
336 | 0 | return true; |
337 | 0 | } |
338 | 0 | entry_response->resultsSize = request->subscriptionAcknowledgementsSize; |
339 | 0 | } |
340 | | |
341 | | /* <--- Async response from here on ---> */ |
342 | | |
343 | 0 | entry->requestId = server->asyncManager.currentRequestId; |
344 | 0 | entry_response->responseHeader.requestHandle = request->requestHeader.requestHandle; |
345 | | |
346 | | /* Delete Acknowledged Subscription Messages */ |
347 | 0 | for(size_t i = 0; i < request->subscriptionAcknowledgementsSize; ++i) { |
348 | 0 | UA_SubscriptionAcknowledgement *ack = &request->subscriptionAcknowledgements[i]; |
349 | 0 | UA_Subscription *sub = UA_Session_getSubscriptionById(session, ack->subscriptionId); |
350 | 0 | if(!sub) { |
351 | 0 | entry_response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
352 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
353 | 0 | "Cannot process acknowledgements subscription %u" PRIu32, |
354 | 0 | ack->subscriptionId); |
355 | 0 | continue; |
356 | 0 | } |
357 | | /* Remove the acked transmission from the retransmission queue */ |
358 | 0 | entry_response->results[i] = |
359 | 0 | UA_Subscription_removeRetransmissionMessage(sub, ack->sequenceNumber); |
360 | 0 | } |
361 | | |
362 | | /* Set the maxTime if a timeout hint is defined */ |
363 | 0 | entry->maxTime = UA_INT64_MAX; |
364 | 0 | if(request->requestHeader.timeoutHint > 0) { |
365 | 0 | UA_EventLoop *el = server->config.eventLoop; |
366 | 0 | entry->maxTime = el->dateTime_nowMonotonic(el) + |
367 | 0 | (request->requestHeader.timeoutHint * UA_DATETIME_MSEC); |
368 | 0 | } |
369 | | |
370 | | /* Queue the publish response. It will be dequeued in a repeated publish |
371 | | * callback. This can also be triggered right now for a late |
372 | | * subscription. */ |
373 | 0 | UA_Session_queuePublishReq(session, entry, false); |
374 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, "Queued a publication message"); |
375 | | |
376 | | /* If there are late subscriptions, the new publish request is used to |
377 | | * answer them immediately. Late subscriptions with higher priority are |
378 | | * considered earlier. However, a single subscription that generates many |
379 | | * notifications must not "starve" other late subscriptions. Hence we move |
380 | | * it to the end of the queue for the subscriptions of that priority. */ |
381 | 0 | UA_Subscription *late, *late_tmp; |
382 | 0 | TAILQ_FOREACH_SAFE(late, &session->subscriptions, sessionListEntry, late_tmp) { |
383 | | /* Skip non-late subscriptions */ |
384 | 0 | if(!late->late) |
385 | 0 | continue; |
386 | | |
387 | | /* Call publish on the late subscription */ |
388 | 0 | UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, late, |
389 | 0 | "Send PublishResponse on a late subscription"); |
390 | 0 | UA_Subscription_publish(server, late); |
391 | | |
392 | | /* Skip re-insert if the subscription was deleted or deactivated during |
393 | | * _publish */ |
394 | 0 | if(late->state >= UA_SUBSCRIPTIONSTATE_ENABLED_NOPUBLISH) { |
395 | | /* Find the first element with smaller priority and insert before |
396 | | * that. If there is none, insert at the end of the queue. */ |
397 | 0 | UA_Subscription *after = TAILQ_NEXT(late, sessionListEntry); |
398 | 0 | while(after && after->priority >= late->priority) |
399 | 0 | after = TAILQ_NEXT(after, sessionListEntry); |
400 | 0 | TAILQ_REMOVE(&session->subscriptions, late, sessionListEntry); |
401 | 0 | if(after) |
402 | 0 | TAILQ_INSERT_BEFORE(after, late, sessionListEntry); |
403 | 0 | else |
404 | 0 | TAILQ_INSERT_TAIL(&session->subscriptions, late, sessionListEntry); |
405 | 0 | } |
406 | | |
407 | | /* Responses left in the queue? */ |
408 | 0 | if(session->responseQueueSize == 0) |
409 | 0 | break; |
410 | 0 | } |
411 | |
|
412 | 0 | return false; |
413 | 0 | } |
414 | | |
415 | | static void |
416 | | Operation_DeleteSubscription(UA_Server *server, UA_Session *session, void *_, |
417 | 0 | const UA_UInt32 *subscriptionId, UA_StatusCode *result) { |
418 | | /* Find the Subscription */ |
419 | 0 | UA_Subscription *sub = UA_Session_getSubscriptionById(session, *subscriptionId); |
420 | 0 | if(!sub) { |
421 | 0 | *result = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
422 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
423 | 0 | "Deleting Subscription with Id %" PRIu32 |
424 | 0 | " failed with error code %s", |
425 | 0 | *subscriptionId, UA_StatusCode_name(*result)); |
426 | 0 | return; |
427 | 0 | } |
428 | | |
429 | | /* Notify the application */ |
430 | 0 | notifySubscription(server, sub, |
431 | 0 | UA_APPLICATIONNOTIFICATIONTYPE_SUBSCRIPTION_DELETED); |
432 | | |
433 | | /* Delete the Subscription */ |
434 | 0 | UA_Subscription_delete(server, sub); |
435 | 0 | *result = UA_STATUSCODE_GOOD; |
436 | |
|
437 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
438 | 0 | "Subscription %" PRIu32 " | Subscription deleted", |
439 | 0 | *subscriptionId); |
440 | 0 | } |
441 | | |
442 | | UA_Boolean |
443 | | Service_DeleteSubscriptions(UA_Server *server, UA_Session *session, |
444 | | const UA_DeleteSubscriptionsRequest *request, |
445 | 0 | UA_DeleteSubscriptionsResponse *response) { |
446 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
447 | 0 | "Processing DeleteSubscriptionsRequest"); |
448 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
449 | |
|
450 | 0 | response->responseHeader.serviceResult = |
451 | 0 | allocProcessServiceOperations(server, session, |
452 | 0 | (UA_ServiceOperation)Operation_DeleteSubscription, |
453 | 0 | NULL, &request->subscriptionIdsSize, |
454 | 0 | &UA_TYPES[UA_TYPES_UINT32], &response->resultsSize, |
455 | 0 | &UA_TYPES[UA_TYPES_STATUSCODE]); |
456 | 0 | return true; |
457 | 0 | } |
458 | | |
459 | | UA_Boolean |
460 | | Service_Republish(UA_Server *server, UA_Session *session, |
461 | | const UA_RepublishRequest *request, |
462 | 0 | UA_RepublishResponse *response) { |
463 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
464 | 0 | "Processing RepublishRequest"); |
465 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
466 | | |
467 | | /* Get the subscription */ |
468 | 0 | UA_Subscription *sub = UA_Session_getSubscriptionById(session, request->subscriptionId); |
469 | 0 | if(!sub) { |
470 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
471 | 0 | return true; |
472 | 0 | } |
473 | | |
474 | | /* Reset the lifetime counter */ |
475 | 0 | Subscription_resetLifetime(sub); |
476 | | |
477 | | /* Update the subscription statistics */ |
478 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
479 | 0 | sub->republishRequestCount++; |
480 | 0 | #endif |
481 | | |
482 | | /* Find the notification in the retransmission queue */ |
483 | 0 | UA_NotificationMessageEntry *entry; |
484 | 0 | TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) { |
485 | 0 | if(entry->message.sequenceNumber == request->retransmitSequenceNumber) |
486 | 0 | break; |
487 | 0 | } |
488 | 0 | if(!entry) { |
489 | 0 | response->responseHeader.serviceResult = UA_STATUSCODE_BADMESSAGENOTAVAILABLE; |
490 | 0 | return true; |
491 | 0 | } |
492 | | |
493 | 0 | response->responseHeader.serviceResult = |
494 | 0 | UA_NotificationMessage_copy(&entry->message, &response->notificationMessage); |
495 | | |
496 | | /* Update the subscription statistics for the case where we return a message */ |
497 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
498 | 0 | sub->republishMessageCount++; |
499 | 0 | #endif |
500 | |
|
501 | 0 | return true; |
502 | 0 | } |
503 | | |
504 | | static UA_StatusCode |
505 | 0 | setTransferredSequenceNumbers(const UA_Subscription *sub, UA_TransferResult *result) { |
506 | | /* Allocate memory */ |
507 | 0 | result->availableSequenceNumbers = (UA_UInt32*) |
508 | 0 | UA_Array_new(sub->retransmissionQueueSize, &UA_TYPES[UA_TYPES_UINT32]); |
509 | 0 | if(!result->availableSequenceNumbers) |
510 | 0 | return UA_STATUSCODE_BADOUTOFMEMORY; |
511 | 0 | result->availableSequenceNumbersSize = sub->retransmissionQueueSize; |
512 | | |
513 | | /* Copy over the sequence numbers */ |
514 | 0 | UA_NotificationMessageEntry *entry; |
515 | 0 | size_t i = 0; |
516 | 0 | TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) { |
517 | 0 | result->availableSequenceNumbers[i] = entry->message.sequenceNumber; |
518 | 0 | i++; |
519 | 0 | } |
520 | |
|
521 | 0 | UA_assert(i == result->availableSequenceNumbersSize); |
522 | | |
523 | 0 | return UA_STATUSCODE_GOOD; |
524 | 0 | } |
525 | | |
526 | | static void |
527 | | Operation_TransferSubscription(UA_Server *server, UA_Session *session, |
528 | | const UA_Boolean *sendInitialValues, |
529 | | const UA_UInt32 *subscriptionId, |
530 | 0 | UA_TransferResult *result) { |
531 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
532 | | |
533 | | /* Get the subscription. This requires a server-wide lookup instead of the |
534 | | * usual session-wide lookup. */ |
535 | 0 | UA_Subscription *sub = getSubscriptionById(server, *subscriptionId); |
536 | 0 | if(!sub) { |
537 | 0 | result->statusCode = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
538 | 0 | return; |
539 | 0 | } |
540 | | |
541 | | /* Update the diagnostics statistics */ |
542 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
543 | 0 | sub->transferRequestCount++; |
544 | 0 | #endif |
545 | | |
546 | | /* Is this the same session? Return the sequence numbers and do nothing else. */ |
547 | 0 | UA_Session *oldSession = sub->session; |
548 | 0 | if(oldSession == session) { |
549 | 0 | result->statusCode = setTransferredSequenceNumbers(sub, result); |
550 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
551 | 0 | sub->transferredToSameClientCount++; |
552 | 0 | #endif |
553 | 0 | return; |
554 | 0 | } |
555 | | |
556 | | /* Check with AccessControl if the transfer is allowed */ |
557 | 0 | if(server->config.accessControl.allowTransferSubscription) { |
558 | 0 | if(!server->config.accessControl. |
559 | 0 | allowTransferSubscription(server, &server->config.accessControl, |
560 | 0 | oldSession ? &oldSession->sessionId : NULL, |
561 | 0 | oldSession ? oldSession->context : NULL, |
562 | 0 | &session->sessionId, session->context)) { |
563 | 0 | result->statusCode = UA_STATUSCODE_BADUSERACCESSDENIED; |
564 | 0 | return; |
565 | 0 | } |
566 | 0 | } else { |
567 | 0 | result->statusCode = UA_STATUSCODE_BADUSERACCESSDENIED; |
568 | 0 | return; |
569 | 0 | } |
570 | | |
571 | | /* Check limits for the number of subscriptions for this Session */ |
572 | 0 | if((server->config.maxSubscriptionsPerSession != 0) && |
573 | 0 | (session->subscriptionsSize >= server->config.maxSubscriptionsPerSession)) { |
574 | 0 | result->statusCode = UA_STATUSCODE_BADTOOMANYSUBSCRIPTIONS; |
575 | 0 | return; |
576 | 0 | } |
577 | | |
578 | | /* Allocate memory for the new subscription */ |
579 | 0 | UA_Subscription *newSub = (UA_Subscription*)UA_malloc(sizeof(UA_Subscription)); |
580 | 0 | if(!newSub) { |
581 | 0 | result->statusCode = UA_STATUSCODE_BADOUTOFMEMORY; |
582 | 0 | return; |
583 | 0 | } |
584 | | |
585 | | /* Set the available sequence numbers */ |
586 | 0 | result->statusCode = setTransferredSequenceNumbers(sub, result); |
587 | 0 | if(result->statusCode != UA_STATUSCODE_GOOD) { |
588 | 0 | UA_free(newSub); |
589 | 0 | return; |
590 | 0 | } |
591 | | |
592 | | /* Create an identical copy of the Subscription struct. The original |
593 | | * subscription remains in place until a StatusChange notification has been |
594 | | * sent. The elements for lists and queues are moved over manually to ensure |
595 | | * that all backpointers are set correctly. */ |
596 | 0 | memcpy(newSub, sub, sizeof(UA_Subscription)); |
597 | | |
598 | | /* Set to the same state as the original subscription */ |
599 | 0 | newSub->publishCallbackId = 0; |
600 | 0 | result->statusCode = Subscription_setState(server, newSub, sub->state); |
601 | 0 | if(result->statusCode != UA_STATUSCODE_GOOD) { |
602 | 0 | UA_Array_delete(result->availableSequenceNumbers, |
603 | 0 | sub->retransmissionQueueSize, &UA_TYPES[UA_TYPES_UINT32]); |
604 | 0 | result->availableSequenceNumbers = NULL; |
605 | 0 | result->availableSequenceNumbersSize = 0; |
606 | 0 | UA_free(newSub); |
607 | 0 | return; |
608 | 0 | } |
609 | | |
610 | | /* <-- The point of no return --> */ |
611 | | |
612 | | /* Move over the MonitoredItems and adjust the backpointers */ |
613 | 0 | LIST_INIT(&newSub->monitoredItems); |
614 | 0 | UA_MonitoredItem *mon, *mon_tmp; |
615 | 0 | LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp) { |
616 | 0 | LIST_REMOVE(mon, listEntry); |
617 | 0 | mon->subscription = newSub; |
618 | 0 | LIST_INSERT_HEAD(&newSub->monitoredItems, mon, listEntry); |
619 | 0 | } |
620 | 0 | sub->monitoredItemsSize = 0; |
621 | | |
622 | | /* Move over the notification queue */ |
623 | 0 | TAILQ_INIT(&newSub->notificationQueue); |
624 | 0 | UA_Notification *nn, *nn_tmp; |
625 | 0 | TAILQ_FOREACH_SAFE(nn, &sub->notificationQueue, subEntry, nn_tmp) { |
626 | 0 | TAILQ_REMOVE(&sub->notificationQueue, nn, subEntry); |
627 | 0 | TAILQ_INSERT_TAIL(&newSub->notificationQueue, nn, subEntry); |
628 | 0 | } |
629 | 0 | sub->notificationQueueSize = 0; |
630 | 0 | sub->dataChangeNotifications = 0; |
631 | 0 | sub->eventNotifications = 0; |
632 | |
|
633 | 0 | TAILQ_INIT(&newSub->retransmissionQueue); |
634 | 0 | UA_NotificationMessageEntry *nme, *nme_tmp; |
635 | 0 | TAILQ_FOREACH_SAFE(nme, &sub->retransmissionQueue, listEntry, nme_tmp) { |
636 | 0 | TAILQ_REMOVE(&sub->retransmissionQueue, nme, listEntry); |
637 | 0 | TAILQ_INSERT_TAIL(&newSub->retransmissionQueue, nme, listEntry); |
638 | 0 | if(oldSession) |
639 | 0 | oldSession->totalRetransmissionQueueSize -= 1; |
640 | 0 | sub->retransmissionQueueSize -= 1; |
641 | 0 | } |
642 | 0 | UA_assert(sub->retransmissionQueueSize == 0); |
643 | 0 | sub->retransmissionQueueSize = 0; |
644 | | |
645 | | /* Add to the server */ |
646 | 0 | UA_assert(newSub->subscriptionId == sub->subscriptionId); |
647 | 0 | LIST_INSERT_HEAD(&server->subscriptions, newSub, serverListEntry); |
648 | 0 | server->subscriptionsSize++; |
649 | | |
650 | | /* Attach to the session */ |
651 | 0 | UA_Session_attachSubscription(session, newSub); |
652 | | |
653 | | /* Notify the application */ |
654 | 0 | notifySubscription(server, newSub, |
655 | 0 | UA_APPLICATIONNOTIFICATIONTYPE_SUBSCRIPTION_TRANSFERRED); |
656 | |
|
657 | 0 | UA_LOG_INFO_SUBSCRIPTION(server->config.logging, newSub, |
658 | 0 | "Transferred to this Session"); |
659 | | |
660 | | /* Set StatusChange in the original subscription and force publish. This |
661 | | * also removes the Subscription, even if there was no PublishResponse |
662 | | * queued to send a StatusChangeNotification. */ |
663 | 0 | sub->statusChange = UA_STATUSCODE_GOODSUBSCRIPTIONTRANSFERRED; |
664 | 0 | UA_Subscription_publish(server, sub); |
665 | | |
666 | | /* Re-create notifications with the current values for the new subscription */ |
667 | 0 | if(*sendInitialValues) |
668 | 0 | UA_Subscription_resendData(server, newSub); |
669 | | |
670 | | /* Do not update the statistics for the number of Subscriptions here. The |
671 | | * fact that we duplicate the subscription and move over the content is just |
672 | | * an implementtion detail. |
673 | | * server->serverDiagnosticsSummary.currentSubscriptionCount++; |
674 | | * server->serverDiagnosticsSummary.cumulatedSubscriptionCount++; |
675 | | * |
676 | | * Update the diagnostics statistics: */ |
677 | 0 | #ifdef UA_ENABLE_DIAGNOSTICS |
678 | 0 | if(oldSession && |
679 | 0 | UA_equal(&oldSession->clientDescription, &session->clientDescription, |
680 | 0 | &UA_TYPES[UA_TYPES_APPLICATIONDESCRIPTION])) |
681 | 0 | sub->transferredToSameClientCount++; |
682 | 0 | else |
683 | 0 | sub->transferredToAltClientCount++; |
684 | 0 | #endif |
685 | 0 | } |
686 | | |
687 | | UA_Boolean |
688 | | Service_TransferSubscriptions(UA_Server *server, UA_Session *session, |
689 | | const UA_TransferSubscriptionsRequest *request, |
690 | 0 | UA_TransferSubscriptionsResponse *response) { |
691 | 0 | UA_LOG_DEBUG_SESSION(server->config.logging, session, |
692 | 0 | "Processing TransferSubscriptionsRequest"); |
693 | 0 | UA_LOCK_ASSERT(&server->serviceMutex); |
694 | |
|
695 | 0 | response->responseHeader.serviceResult = |
696 | 0 | allocProcessServiceOperations(server, session, |
697 | 0 | (UA_ServiceOperation)Operation_TransferSubscription, |
698 | 0 | &request->sendInitialValues, |
699 | 0 | &request->subscriptionIdsSize, |
700 | 0 | &UA_TYPES[UA_TYPES_UINT32], |
701 | 0 | &response->resultsSize, |
702 | 0 | &UA_TYPES[UA_TYPES_TRANSFERRESULT]); |
703 | | return true; |
704 | 0 | } |
705 | | |
706 | | #endif /* UA_ENABLE_SUBSCRIPTIONS */ |