/src/open62541/src/pubsub/ua_pubsub_manager.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* This Source Code Form is subject to the terms of the Mozilla Public |
2 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
3 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
4 | | * |
5 | | * Copyright (c) 2017-2025 Fraunhofer IOSB (Author: Andreas Ebner) |
6 | | * Copyright (c) 2018 Fraunhofer IOSB (Author: Julius Pfrommer) |
7 | | * Copyright (c) 2021 Fraunhofer IOSB (Author: Jan Hermes) |
8 | | * Copyright (c) 2022 Siemens AG (Author: Thomas Fischer) |
9 | | * Copyright (c) 2022 Fraunhofer IOSB (Author: Noel Graf) |
10 | | * Copyright (c) 2022 Linutronix GmbH (Author: Muddasir Shakil) |
11 | | */ |
12 | | |
13 | | #include "ua_pubsub_internal.h" |
14 | | |
15 | | #ifdef UA_ENABLE_PUBSUB /* conditional compilation */ |
16 | | |
17 | | #ifdef UA_ENABLE_PUBSUB_SKS |
18 | | #include "ua_pubsub_keystorage.h" |
19 | | #endif |
20 | | |
21 | 0 | #define UA_DATETIMESTAMP_2000 125911584000000000 |
22 | 0 | #define UA_RESERVEID_FIRST_ID 0x8000 |
23 | | |
24 | | static const char *pubSubStateNames[6] = { |
25 | | "Disabled", "Paused", "Operational", "Error", "PreOperational", "Invalid" |
26 | | }; |
27 | | |
28 | | static void |
29 | | UA_PubSubManager_stop(UA_ServerComponent *sc); |
30 | | |
31 | | static UA_StatusCode |
32 | | UA_PubSubManager_start(UA_ServerComponent *sc, UA_Server *server); |
33 | | |
34 | | const char * |
35 | 0 | UA_PubSubState_name(UA_PubSubState state) { |
36 | 0 | if(state < UA_PUBSUBSTATE_DISABLED || state > UA_PUBSUBSTATE_PREOPERATIONAL) |
37 | 0 | return pubSubStateNames[5]; |
38 | 0 | return pubSubStateNames[state]; |
39 | 0 | } |
40 | | |
41 | | void |
42 | 0 | UA_PubSubComponentHead_clear(UA_PubSubComponentHead *psch) { |
43 | 0 | UA_NodeId_clear(&psch->identifier); |
44 | 0 | UA_String_clear(&psch->logIdString); |
45 | 0 | memset(psch, 0, sizeof(UA_PubSubComponentHead)); |
46 | 0 | } |
47 | | |
48 | | UA_StatusCode |
49 | | UA_PublisherId_copy(const UA_PublisherId *src, |
50 | 0 | UA_PublisherId *dst) { |
51 | 0 | memcpy(dst, src, sizeof(UA_PublisherId)); |
52 | 0 | if(src->idType == UA_PUBLISHERIDTYPE_STRING) |
53 | 0 | return UA_String_copy(&src->id.string, &dst->id.string); |
54 | 0 | return UA_STATUSCODE_GOOD; |
55 | 0 | } |
56 | | |
57 | | void |
58 | 0 | UA_PublisherId_clear(UA_PublisherId *p) { |
59 | 0 | if(p->idType == UA_PUBLISHERIDTYPE_STRING) |
60 | 0 | UA_String_clear(&p->id.string); |
61 | 0 | memset(p, 0, sizeof(UA_PublisherId)); |
62 | 0 | } |
63 | | |
64 | | UA_StatusCode |
65 | 0 | UA_PublisherId_fromVariant(UA_PublisherId *p, const UA_Variant *src) { |
66 | 0 | if(!UA_Variant_isScalar(src)) |
67 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
68 | | |
69 | 0 | memset(p, 0, sizeof(UA_PublisherId)); |
70 | |
|
71 | 0 | const void *data = (const void*)src->data; |
72 | 0 | if(src->type == &UA_TYPES[UA_TYPES_BYTE]) { |
73 | 0 | p->idType = UA_PUBLISHERIDTYPE_BYTE; |
74 | 0 | p->id.byte = *(const UA_Byte*)data; |
75 | 0 | } else if(src->type == &UA_TYPES[UA_TYPES_UINT16]) { |
76 | 0 | p->idType = UA_PUBLISHERIDTYPE_UINT16; |
77 | 0 | p->id.uint16 = *(const UA_UInt16*)data; |
78 | 0 | } else if(src->type == &UA_TYPES[UA_TYPES_UINT32]) { |
79 | 0 | p->idType = UA_PUBLISHERIDTYPE_UINT32; |
80 | 0 | p->id.uint32 = *(const UA_UInt32*)data; |
81 | 0 | } else if(src->type == &UA_TYPES[UA_TYPES_UINT64]) { |
82 | 0 | p->idType = UA_PUBLISHERIDTYPE_UINT64; |
83 | 0 | p->id.uint64 = *(const UA_UInt64*)data; |
84 | 0 | } else if(src->type == &UA_TYPES[UA_TYPES_STRING]) { |
85 | 0 | p->idType = UA_PUBLISHERIDTYPE_STRING; |
86 | 0 | return UA_String_copy((const UA_String *)data, &p->id.string); |
87 | 0 | } else { |
88 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
89 | 0 | } |
90 | 0 | return UA_STATUSCODE_GOOD; |
91 | 0 | } |
92 | | |
93 | | void |
94 | 0 | UA_PublisherId_toVariant(const UA_PublisherId *p, UA_Variant *dst) { |
95 | 0 | UA_PublisherId *p2 = (UA_PublisherId*)(uintptr_t)p; |
96 | 0 | switch(p->idType) { |
97 | 0 | case UA_PUBLISHERIDTYPE_BYTE: |
98 | 0 | UA_Variant_setScalar(dst, &p2->id.byte, &UA_TYPES[UA_TYPES_BYTE]); break; |
99 | 0 | case UA_PUBLISHERIDTYPE_UINT16: |
100 | 0 | UA_Variant_setScalar(dst, &p2->id.uint16, &UA_TYPES[UA_TYPES_UINT16]); break; |
101 | 0 | case UA_PUBLISHERIDTYPE_UINT32: |
102 | 0 | UA_Variant_setScalar(dst, &p2->id.uint32, &UA_TYPES[UA_TYPES_UINT32]); break; |
103 | 0 | case UA_PUBLISHERIDTYPE_UINT64: |
104 | 0 | UA_Variant_setScalar(dst, &p2->id.uint64, &UA_TYPES[UA_TYPES_UINT64]); break; |
105 | 0 | case UA_PUBLISHERIDTYPE_STRING: |
106 | 0 | UA_Variant_setScalar(dst, &p2->id.string, &UA_TYPES[UA_TYPES_STRING]); break; |
107 | 0 | default: break; /* This is not possible if the PublisherId is well-defined */ |
108 | 0 | } |
109 | 0 | } |
110 | | |
111 | | UA_ConnectionManager * |
112 | 0 | getCM(UA_EventLoop *el, UA_String protocol) { |
113 | 0 | for(UA_EventSource *es = el->eventSources; es != NULL; es = es->next) { |
114 | 0 | if(es->eventSourceType != UA_EVENTSOURCETYPE_CONNECTIONMANAGER) |
115 | 0 | continue; |
116 | 0 | UA_ConnectionManager *cm = (UA_ConnectionManager*)es; |
117 | 0 | if(UA_String_equal(&protocol, &cm->protocol)) |
118 | 0 | return cm; |
119 | 0 | } |
120 | 0 | return NULL; |
121 | 0 | } |
122 | | |
123 | | static enum ZIP_CMP |
124 | 0 | cmpReserveId(const void *a, const void *b) { |
125 | 0 | const UA_ReserveId *aa = (const UA_ReserveId*)a; |
126 | 0 | const UA_ReserveId *bb = (const UA_ReserveId*)b; |
127 | 0 | if(aa->id != bb->id) |
128 | 0 | return (aa->id < bb->id) ? ZIP_CMP_LESS : ZIP_CMP_MORE; |
129 | 0 | if(aa->reserveIdType != bb->reserveIdType) |
130 | 0 | return (aa->reserveIdType < bb->reserveIdType) ? ZIP_CMP_LESS : ZIP_CMP_MORE; |
131 | 0 | return (enum ZIP_CMP)UA_order(&aa->transportProfileUri, |
132 | 0 | &bb->transportProfileUri, &UA_TYPES[UA_TYPES_STRING]); |
133 | 0 | } |
134 | | |
135 | | ZIP_FUNCTIONS(UA_ReserveIdTree, UA_ReserveId, treeEntry, UA_ReserveId, id, cmpReserveId) |
136 | | |
137 | | static UA_ReserveId * |
138 | | UA_ReserveId_new(UA_UInt16 id, UA_String transportProfileUri, |
139 | 0 | UA_ReserveIdType reserveIdType, UA_NodeId sessionId) { |
140 | 0 | UA_ReserveId *reserveId = (UA_ReserveId *)UA_calloc(1, sizeof(UA_ReserveId)); |
141 | 0 | if(!reserveId) |
142 | 0 | return NULL; |
143 | 0 | reserveId->id = id; |
144 | 0 | reserveId->reserveIdType = reserveIdType; |
145 | 0 | UA_String_copy(&transportProfileUri, &reserveId->transportProfileUri); |
146 | 0 | reserveId->sessionId = sessionId; |
147 | 0 | return reserveId; |
148 | 0 | } |
149 | | |
150 | | static UA_Boolean |
151 | | UA_ReserveId_isFree(UA_PubSubManager *psm, UA_UInt16 id, UA_String transportProfileUri, |
152 | 0 | UA_ReserveIdType reserveIdType) { |
153 | | /* Is the id already in use? */ |
154 | 0 | UA_ReserveId compare; |
155 | 0 | compare.id = id; |
156 | 0 | compare.reserveIdType = reserveIdType; |
157 | 0 | compare.transportProfileUri = transportProfileUri; |
158 | 0 | if(ZIP_FIND(UA_ReserveIdTree, &psm->reserveIds, &compare)) |
159 | 0 | return false; |
160 | | |
161 | 0 | UA_PubSubConnection *tmpConnection; |
162 | 0 | TAILQ_FOREACH(tmpConnection, &psm->connections, listEntry) { |
163 | 0 | UA_WriterGroup *writerGroup; |
164 | 0 | LIST_FOREACH(writerGroup, &tmpConnection->writerGroups, listEntry) { |
165 | 0 | if(reserveIdType == UA_WRITER_GROUP) { |
166 | 0 | if(UA_String_equal(&tmpConnection->config.transportProfileUri, |
167 | 0 | &transportProfileUri) && |
168 | 0 | writerGroup->config.writerGroupId == id) |
169 | 0 | return false; |
170 | | /* reserveIdType == UA_DATA_SET_WRITER */ |
171 | 0 | } else { |
172 | 0 | UA_DataSetWriter *currentWriter; |
173 | 0 | LIST_FOREACH(currentWriter, &writerGroup->writers, listEntry) { |
174 | 0 | if(UA_String_equal(&tmpConnection->config.transportProfileUri, |
175 | 0 | &transportProfileUri) && |
176 | 0 | currentWriter->config.dataSetWriterId == id) |
177 | 0 | return false; |
178 | 0 | } |
179 | 0 | } |
180 | 0 | } |
181 | 0 | } |
182 | 0 | return true; |
183 | 0 | } |
184 | | |
185 | | static UA_UInt16 |
186 | | UA_ReserveId_createId(UA_PubSubManager *psm, UA_NodeId sessionId, |
187 | 0 | UA_String transportProfileUri, UA_ReserveIdType reserveIdType) { |
188 | | /* Total number of possible Ids */ |
189 | 0 | UA_UInt16 numberOfIds = 0x8000; |
190 | | /* Contains next possible free Id */ |
191 | 0 | static UA_UInt16 next_id_writerGroup = UA_RESERVEID_FIRST_ID; |
192 | 0 | static UA_UInt16 next_id_writer = UA_RESERVEID_FIRST_ID; |
193 | 0 | UA_UInt16 next_id; |
194 | 0 | UA_Boolean is_free = false; |
195 | |
|
196 | 0 | if(reserveIdType == UA_WRITER_GROUP) |
197 | 0 | next_id = next_id_writerGroup; |
198 | 0 | else |
199 | 0 | next_id = next_id_writer; |
200 | |
|
201 | 0 | for(;numberOfIds > 0;numberOfIds--) { |
202 | 0 | if(next_id < UA_RESERVEID_FIRST_ID) |
203 | 0 | next_id = UA_RESERVEID_FIRST_ID; |
204 | 0 | is_free = UA_ReserveId_isFree(psm, next_id, transportProfileUri, reserveIdType); |
205 | 0 | if(is_free) |
206 | 0 | break; |
207 | 0 | next_id++; |
208 | 0 | } |
209 | 0 | if(!is_free) { |
210 | 0 | UA_LOG_ERROR(psm->logging, UA_LOGCATEGORY_PUBSUB, |
211 | 0 | "PubSub ReserveId creation failed. No free ID could be found."); |
212 | 0 | return 0; |
213 | 0 | } |
214 | | |
215 | 0 | if(reserveIdType == UA_WRITER_GROUP) |
216 | 0 | next_id_writerGroup = (UA_UInt16)(next_id + 1); |
217 | 0 | else |
218 | 0 | next_id_writer = (UA_UInt16)(next_id + 1); |
219 | |
|
220 | 0 | UA_ReserveId *reserveId = |
221 | 0 | UA_ReserveId_new(next_id, transportProfileUri, reserveIdType, sessionId); |
222 | 0 | if(!reserveId) |
223 | 0 | return 0; |
224 | | |
225 | 0 | ZIP_INSERT(UA_ReserveIdTree, &psm->reserveIds, reserveId); |
226 | 0 | psm->reserveIdsSize++; |
227 | 0 | return next_id; |
228 | 0 | } |
229 | | |
230 | | static void * |
231 | 0 | removeReserveId(void *context, UA_ReserveId *elem) { |
232 | 0 | UA_String_clear(&elem->transportProfileUri); |
233 | 0 | UA_free(elem); |
234 | 0 | return NULL; |
235 | 0 | } |
236 | | |
237 | | struct RemoveInactiveReserveIdContext { |
238 | | UA_PubSubManager *psm; |
239 | | UA_ReserveIdTree newTree; |
240 | | }; |
241 | | |
242 | | /* Remove ReserveIds that are not attached to any session */ |
243 | | static void * |
244 | 0 | removeInactiveReserveId(void *context, UA_ReserveId *elem) { |
245 | 0 | struct RemoveInactiveReserveIdContext *ctx = |
246 | 0 | (struct RemoveInactiveReserveIdContext*)context; |
247 | |
|
248 | 0 | if(UA_NodeId_equal(&ctx->psm->sc.server->adminSession.sessionId, &elem->sessionId)) |
249 | 0 | goto still_active; |
250 | | |
251 | 0 | session_list_entry *session; |
252 | 0 | LIST_FOREACH(session, &ctx->psm->sc.server->sessions, pointers) { |
253 | 0 | if(UA_NodeId_equal(&session->session.sessionId, &elem->sessionId)) |
254 | 0 | goto still_active; |
255 | 0 | } |
256 | | |
257 | 0 | ctx->psm->reserveIdsSize--; |
258 | 0 | UA_String_clear(&elem->transportProfileUri); |
259 | 0 | UA_free(elem); |
260 | 0 | return NULL; |
261 | | |
262 | 0 | still_active: |
263 | 0 | ZIP_INSERT(UA_ReserveIdTree, &ctx->newTree, elem); |
264 | 0 | return NULL; |
265 | 0 | } |
266 | | |
267 | | void |
268 | 0 | UA_PubSubManager_freeIds(UA_PubSubManager *psm) { |
269 | 0 | struct RemoveInactiveReserveIdContext removeCtx; |
270 | 0 | removeCtx.psm = psm; |
271 | 0 | removeCtx.newTree.root = NULL; |
272 | 0 | ZIP_ITER(UA_ReserveIdTree, &psm->reserveIds, |
273 | 0 | removeInactiveReserveId, &removeCtx); |
274 | 0 | psm->reserveIds = removeCtx.newTree; |
275 | 0 | } |
276 | | |
277 | | UA_StatusCode |
278 | | UA_PubSubManager_reserveIds(UA_PubSubManager *psm, UA_NodeId sessionId, UA_UInt16 numRegWriterGroupIds, |
279 | | UA_UInt16 numRegDataSetWriterIds, UA_String transportProfileUri, |
280 | 0 | UA_UInt16 **writerGroupIds, UA_UInt16 **dataSetWriterIds) { |
281 | 0 | UA_PubSubManager_freeIds(psm); |
282 | | |
283 | | /* Check the validation of the transportProfileUri */ |
284 | 0 | UA_String profile_1 = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-uadp"); |
285 | 0 | UA_String profile_2 = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-json"); |
286 | 0 | UA_String profile_3 = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp"); |
287 | 0 | if(!UA_String_equal(&transportProfileUri, &profile_1) && |
288 | 0 | !UA_String_equal(&transportProfileUri, &profile_2) && |
289 | 0 | !UA_String_equal(&transportProfileUri, &profile_3)) { |
290 | 0 | UA_LOG_ERROR(psm->logging, UA_LOGCATEGORY_PUBSUB, |
291 | 0 | "PubSub ReserveId creation failed. No valid transport profile uri."); |
292 | 0 | return UA_STATUSCODE_BADINVALIDARGUMENT; |
293 | 0 | } |
294 | 0 | *writerGroupIds = (UA_UInt16*)UA_Array_new(numRegWriterGroupIds, &UA_TYPES[UA_TYPES_UINT16]); |
295 | 0 | *dataSetWriterIds = (UA_UInt16*)UA_Array_new(numRegDataSetWriterIds, &UA_TYPES[UA_TYPES_UINT16]); |
296 | |
|
297 | 0 | for(int i = 0; i < numRegWriterGroupIds; i++) { |
298 | 0 | (*writerGroupIds)[i] = |
299 | 0 | UA_ReserveId_createId(psm, sessionId, transportProfileUri, UA_WRITER_GROUP); |
300 | 0 | } |
301 | 0 | for(int i = 0; i < numRegDataSetWriterIds; i++) { |
302 | 0 | (*dataSetWriterIds)[i] = |
303 | 0 | UA_ReserveId_createId(psm, sessionId, transportProfileUri, UA_DATA_SET_WRITER); |
304 | 0 | } |
305 | 0 | return UA_STATUSCODE_GOOD; |
306 | 0 | } |
307 | | |
308 | | /* Calculate the time difference between current time and UTC (00:00) on January |
309 | | * 1, 2000. */ |
310 | | UA_UInt32 |
311 | 0 | UA_PubSubConfigurationVersionTimeDifference(UA_DateTime now) { |
312 | 0 | UA_UInt32 timeDiffSince2000 = (UA_UInt32)(now - UA_DATETIMESTAMP_2000); |
313 | 0 | return timeDiffSince2000; |
314 | 0 | } |
315 | | |
316 | | /* Generate a new unique NodeId. This NodeId will be used for the information |
317 | | * model representation of PubSub entities. */ |
318 | | #ifndef UA_ENABLE_PUBSUB_INFORMATIONMODEL |
319 | | void |
320 | | UA_PubSubManager_generateUniqueNodeId(UA_PubSubManager *psm, UA_NodeId *nodeId) { |
321 | | *nodeId = UA_NODEID_NUMERIC(1, ++psm->uniqueIdCount); |
322 | | } |
323 | | #endif |
324 | | |
325 | | UA_Guid |
326 | 0 | UA_PubSubManager_generateUniqueGuid(UA_PubSubManager *psm) { |
327 | 0 | while(true) { |
328 | 0 | UA_NodeId testId = UA_NODEID_GUID(1, UA_Guid_random()); |
329 | 0 | const UA_Node *testNode = UA_NODESTORE_GET(psm->sc.server, &testId); |
330 | 0 | if(!testNode) |
331 | 0 | return testId.identifier.guid; |
332 | 0 | UA_NODESTORE_RELEASE(psm->sc.server, testNode); |
333 | 0 | } |
334 | 0 | } |
335 | | |
336 | | static UA_UInt64 |
337 | 551 | generateRandomUInt64(void) { |
338 | 551 | UA_UInt64 id = 0; |
339 | 551 | UA_Guid ident = UA_Guid_random(); |
340 | | |
341 | 551 | id = id + ident.data1; |
342 | 551 | id = (id << 32) + ident.data2; |
343 | 551 | id = (id << 16) + ident.data3; |
344 | 551 | return id; |
345 | 551 | } |
346 | | |
347 | | UA_StatusCode |
348 | 0 | UA_Server_enableAllPubSubComponents(UA_Server *server) { |
349 | 0 | lockServer(server); |
350 | 0 | UA_PubSubManager *psm = getPSM(server); |
351 | 0 | if(!psm) { |
352 | 0 | unlockServer(server); |
353 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
354 | 0 | } |
355 | | |
356 | 0 | UA_StatusCode res = UA_PubSubManager_start(&psm->sc, server); |
357 | 0 | if(res != UA_STATUSCODE_GOOD) |
358 | 0 | return res; |
359 | | |
360 | 0 | UA_PubSubConnection *c; |
361 | 0 | TAILQ_FOREACH(c, &psm->connections, listEntry) { |
362 | 0 | UA_WriterGroup *wg; |
363 | 0 | LIST_FOREACH(wg, &c->writerGroups, listEntry) { |
364 | 0 | UA_DataSetWriter *dsw; |
365 | 0 | LIST_FOREACH(dsw, &wg->writers, listEntry) { |
366 | 0 | res |= UA_DataSetWriter_setPubSubState(psm, dsw, UA_PUBSUBSTATE_OPERATIONAL); |
367 | 0 | } |
368 | 0 | res |= UA_WriterGroup_setPubSubState(psm, wg, UA_PUBSUBSTATE_OPERATIONAL); |
369 | 0 | } |
370 | |
|
371 | 0 | UA_ReaderGroup *rg; |
372 | 0 | LIST_FOREACH(rg, &c->readerGroups, listEntry) { |
373 | 0 | UA_DataSetReader *dsr; |
374 | 0 | LIST_FOREACH(dsr, &rg->readers, listEntry) { |
375 | 0 | UA_DataSetReader_setPubSubState(psm, dsr, UA_PUBSUBSTATE_OPERATIONAL, |
376 | 0 | UA_STATUSCODE_GOOD); |
377 | 0 | } |
378 | 0 | res |= UA_ReaderGroup_setPubSubState(psm, rg, UA_PUBSUBSTATE_OPERATIONAL); |
379 | 0 | } |
380 | |
|
381 | 0 | res |= UA_PubSubConnection_setPubSubState(psm, c, UA_PUBSUBSTATE_OPERATIONAL); |
382 | 0 | } |
383 | |
|
384 | 0 | unlockServer(server); |
385 | 0 | return res; |
386 | 0 | } |
387 | | |
388 | | static void |
389 | 300 | disableAllPubSubComponents(UA_PubSubManager *psm) { |
390 | 300 | UA_PubSubConnection *c; |
391 | 300 | TAILQ_FOREACH(c, &psm->connections, listEntry) { |
392 | 0 | UA_WriterGroup *wg; |
393 | 0 | LIST_FOREACH(wg, &c->writerGroups, listEntry) { |
394 | 0 | UA_DataSetWriter *dsw; |
395 | 0 | LIST_FOREACH(dsw, &wg->writers, listEntry) { |
396 | 0 | UA_DataSetWriter_setPubSubState(psm, dsw, UA_PUBSUBSTATE_DISABLED); |
397 | 0 | } |
398 | 0 | UA_WriterGroup_setPubSubState(psm, wg, UA_PUBSUBSTATE_DISABLED); |
399 | 0 | } |
400 | |
|
401 | 0 | UA_ReaderGroup *rg; |
402 | 0 | LIST_FOREACH(rg, &c->readerGroups, listEntry) { |
403 | 0 | UA_DataSetReader *dsr; |
404 | 0 | LIST_FOREACH(dsr, &rg->readers, listEntry) { |
405 | 0 | UA_DataSetReader_setPubSubState(psm, dsr, UA_PUBSUBSTATE_DISABLED, |
406 | 0 | UA_STATUSCODE_BADSHUTDOWN); |
407 | 0 | } |
408 | 0 | UA_ReaderGroup_setPubSubState(psm, rg, UA_PUBSUBSTATE_DISABLED); |
409 | 0 | } |
410 | |
|
411 | 0 | UA_PubSubConnection_setPubSubState(psm, c, UA_PUBSUBSTATE_DISABLED); |
412 | 0 | } |
413 | 300 | } |
414 | | |
415 | | void |
416 | 0 | UA_Server_disableAllPubSubComponents(UA_Server *server) { |
417 | 0 | lockServer(server); |
418 | 0 | UA_PubSubManager *psm = getPSM(server); |
419 | 0 | if(psm) |
420 | 0 | UA_PubSubManager_stop(&psm->sc); /* Calls disableAll internally */ |
421 | 0 | unlockServer(server); |
422 | 0 | } |
423 | | |
424 | | static UA_StatusCode |
425 | | getPubSubComponentType(UA_PubSubManager *psm, UA_NodeId componentId, |
426 | 0 | UA_PubSubComponentType *outType) { |
427 | 0 | UA_PubSubConnection *c; |
428 | 0 | TAILQ_FOREACH(c, &psm->connections, listEntry) { |
429 | 0 | if(UA_NodeId_equal(&componentId, &c->head.identifier)) { |
430 | 0 | *outType = c->head.componentType; |
431 | 0 | return UA_STATUSCODE_GOOD; |
432 | 0 | } |
433 | | |
434 | 0 | UA_WriterGroup *wg; |
435 | 0 | LIST_FOREACH(wg, &c->writerGroups, listEntry) { |
436 | 0 | if(UA_NodeId_equal(&componentId, &wg->head.identifier)) { |
437 | 0 | *outType = wg->head.componentType; |
438 | 0 | return UA_STATUSCODE_GOOD; |
439 | 0 | } |
440 | | |
441 | 0 | UA_DataSetWriter *dsw; |
442 | 0 | LIST_FOREACH(dsw, &wg->writers, listEntry) { |
443 | 0 | if(UA_NodeId_equal(&componentId, &dsw->head.identifier)) { |
444 | 0 | *outType = dsw->head.componentType; |
445 | 0 | return UA_STATUSCODE_GOOD; |
446 | 0 | } |
447 | 0 | } |
448 | 0 | } |
449 | | |
450 | 0 | UA_ReaderGroup *rg; |
451 | 0 | LIST_FOREACH(rg, &c->readerGroups, listEntry) { |
452 | 0 | if(UA_NodeId_equal(&componentId, &rg->head.identifier)) { |
453 | 0 | *outType = rg->head.componentType; |
454 | 0 | return UA_STATUSCODE_GOOD; |
455 | 0 | } |
456 | | |
457 | 0 | UA_DataSetReader *dsr; |
458 | 0 | LIST_FOREACH(dsr, &rg->readers, listEntry) { |
459 | 0 | if(UA_NodeId_equal(&componentId, &dsr->head.identifier)) { |
460 | 0 | *outType = dsr->head.componentType; |
461 | 0 | return UA_STATUSCODE_GOOD; |
462 | 0 | } |
463 | 0 | } |
464 | 0 | } |
465 | 0 | } |
466 | | |
467 | 0 | return UA_STATUSCODE_BADNOTFOUND; |
468 | 0 | } |
469 | | |
470 | | UA_StatusCode |
471 | | UA_Server_getPubSubComponentType(UA_Server *server, UA_NodeId componentId, |
472 | 0 | UA_PubSubComponentType *outType) { |
473 | 0 | if(!outType) |
474 | 0 | return UA_STATUSCODE_BADINVALIDARGUMENT; |
475 | 0 | lockServer(server); |
476 | 0 | UA_PubSubManager *psm = getPSM(server); |
477 | 0 | UA_StatusCode res = (psm) ? |
478 | 0 | getPubSubComponentType(psm, componentId, outType) : UA_STATUSCODE_BADINTERNALERROR; |
479 | 0 | unlockServer(server); |
480 | 0 | return res; |
481 | 0 | } |
482 | | |
483 | | static UA_StatusCode |
484 | | getPubSubComponentParent(UA_PubSubManager *psm, UA_NodeId componentId, |
485 | 0 | UA_NodeId *outParent) { |
486 | 0 | UA_PubSubConnection *c; |
487 | 0 | TAILQ_FOREACH(c, &psm->connections, listEntry) { |
488 | 0 | if(UA_NodeId_equal(&componentId, &c->head.identifier)) |
489 | 0 | return UA_STATUSCODE_BADNOTSUPPORTED; |
490 | | |
491 | 0 | UA_WriterGroup *wg; |
492 | 0 | LIST_FOREACH(wg, &c->writerGroups, listEntry) { |
493 | 0 | if(UA_NodeId_equal(&componentId, &wg->head.identifier)) |
494 | 0 | return UA_NodeId_copy(&c->head.identifier, outParent); |
495 | | |
496 | 0 | UA_DataSetWriter *dsw; |
497 | 0 | LIST_FOREACH(dsw, &wg->writers, listEntry) { |
498 | 0 | if(UA_NodeId_equal(&componentId, &dsw->head.identifier)) |
499 | 0 | return UA_NodeId_copy(&wg->head.identifier, outParent); |
500 | 0 | } |
501 | 0 | } |
502 | | |
503 | 0 | UA_ReaderGroup *rg; |
504 | 0 | LIST_FOREACH(rg, &c->readerGroups, listEntry) { |
505 | 0 | if(UA_NodeId_equal(&componentId, &rg->head.identifier)) |
506 | 0 | return UA_NodeId_copy(&c->head.identifier, outParent); |
507 | | |
508 | 0 | UA_DataSetReader *dsr; |
509 | 0 | LIST_FOREACH(dsr, &rg->readers, listEntry) { |
510 | 0 | if(UA_NodeId_equal(&componentId, &dsr->head.identifier)) |
511 | 0 | return UA_NodeId_copy(&rg->head.identifier, outParent); |
512 | 0 | } |
513 | 0 | } |
514 | 0 | } |
515 | | |
516 | 0 | return UA_STATUSCODE_BADNOTFOUND; |
517 | 0 | } |
518 | | |
519 | | UA_StatusCode |
520 | | UA_Server_getPubSubComponentParent(UA_Server *server, UA_NodeId componentId, |
521 | 0 | UA_NodeId *outParent) { |
522 | 0 | if(!outParent) |
523 | 0 | return UA_STATUSCODE_BADINVALIDARGUMENT; |
524 | 0 | lockServer(server); |
525 | 0 | UA_PubSubManager *psm = getPSM(server); |
526 | 0 | UA_StatusCode res = (psm) ? |
527 | 0 | getPubSubComponentParent(psm, componentId, outParent) : UA_STATUSCODE_BADINTERNALERROR; |
528 | 0 | unlockServer(server); |
529 | 0 | return res; |
530 | 0 | } |
531 | | |
532 | | static UA_StatusCode |
533 | | getPubSubComponentChildren(UA_PubSubManager *psm, UA_NodeId componentId, |
534 | 0 | size_t *outChildrenSize, UA_NodeId **outChildren) { |
535 | 0 | UA_WriterGroup *wg; |
536 | 0 | UA_ReaderGroup *rg; |
537 | 0 | UA_DataSetWriter *dsw; |
538 | 0 | UA_DataSetReader *dsr; |
539 | 0 | UA_PubSubConnection *c; |
540 | |
|
541 | 0 | UA_StatusCode res = UA_STATUSCODE_GOOD; |
542 | 0 | TAILQ_FOREACH(c, &psm->connections, listEntry) { |
543 | 0 | if(UA_NodeId_equal(&componentId, &c->head.identifier)) { |
544 | | /* Count the children */ |
545 | 0 | size_t children = 0; |
546 | 0 | LIST_FOREACH(wg, &c->writerGroups, listEntry) |
547 | 0 | children++; |
548 | 0 | LIST_FOREACH(rg, &c->readerGroups, listEntry) |
549 | 0 | children++; |
550 | | |
551 | | /* Empty array? */ |
552 | 0 | if(children == 0) { |
553 | 0 | *outChildren = NULL; |
554 | 0 | *outChildrenSize = 0; |
555 | 0 | return UA_STATUSCODE_GOOD; |
556 | 0 | } |
557 | | |
558 | | /* Allocate the array */ |
559 | 0 | *outChildren = (UA_NodeId*)UA_calloc(children, sizeof(UA_NodeId)); |
560 | 0 | if(!*outChildren) |
561 | 0 | return UA_STATUSCODE_BADOUTOFMEMORY; |
562 | 0 | *outChildrenSize = children; |
563 | | |
564 | | /* Copy the NodeIds */ |
565 | 0 | size_t pos = 0; |
566 | 0 | LIST_FOREACH(wg, &c->writerGroups, listEntry) { |
567 | 0 | res |= UA_NodeId_copy(&wg->head.identifier, (*outChildren) + pos); |
568 | 0 | pos++; |
569 | 0 | } |
570 | 0 | LIST_FOREACH(rg, &c->readerGroups, listEntry) { |
571 | 0 | res |= UA_NodeId_copy(&rg->head.identifier, (*outChildren) + pos); |
572 | 0 | pos++; |
573 | 0 | } |
574 | 0 | goto out; |
575 | 0 | } |
576 | | |
577 | 0 | LIST_FOREACH(wg, &c->writerGroups, listEntry) { |
578 | 0 | if(UA_NodeId_equal(&componentId, &wg->head.identifier)) { |
579 | | /* Count the children */ |
580 | 0 | size_t children = 0; |
581 | 0 | LIST_FOREACH(dsw, &wg->writers, listEntry) |
582 | 0 | children++; |
583 | | |
584 | | /* Empty array? */ |
585 | 0 | if(children == 0) { |
586 | 0 | *outChildren = NULL; |
587 | 0 | *outChildrenSize = 0; |
588 | 0 | return UA_STATUSCODE_GOOD; |
589 | 0 | } |
590 | | |
591 | | /* Allocate the array */ |
592 | 0 | *outChildren = (UA_NodeId*)UA_calloc(children, sizeof(UA_NodeId)); |
593 | 0 | if(!*outChildren) |
594 | 0 | return UA_STATUSCODE_BADOUTOFMEMORY; |
595 | 0 | *outChildrenSize = children; |
596 | | |
597 | | /* Copy the NodeIds */ |
598 | 0 | size_t pos = 0; |
599 | 0 | LIST_FOREACH(dsw, &wg->writers, listEntry) { |
600 | 0 | res |= UA_NodeId_copy(&dsw->head.identifier, (*outChildren) + pos); |
601 | 0 | pos++; |
602 | 0 | } |
603 | 0 | goto out; |
604 | 0 | } |
605 | | |
606 | | /* DataSetWriter have no children (with a state machine) */ |
607 | 0 | LIST_FOREACH(dsw, &wg->writers, listEntry) { |
608 | 0 | if(UA_NodeId_equal(&componentId, &dsw->head.identifier)) |
609 | 0 | return UA_STATUSCODE_BADNOTSUPPORTED; |
610 | 0 | } |
611 | 0 | } |
612 | | |
613 | 0 | LIST_FOREACH(rg, &c->readerGroups, listEntry) { |
614 | 0 | if(UA_NodeId_equal(&componentId, &rg->head.identifier)) { |
615 | | /* Count the children */ |
616 | 0 | size_t children = 0; |
617 | 0 | LIST_FOREACH(dsr, &rg->readers, listEntry) |
618 | 0 | children++; |
619 | | |
620 | | /* Empty array? */ |
621 | 0 | if(children == 0) { |
622 | 0 | *outChildren = NULL; |
623 | 0 | *outChildrenSize = 0; |
624 | 0 | return UA_STATUSCODE_GOOD; |
625 | 0 | } |
626 | | |
627 | | /* Allocate the array */ |
628 | 0 | *outChildren = (UA_NodeId*)UA_calloc(children, sizeof(UA_NodeId)); |
629 | 0 | if(!*outChildren) |
630 | 0 | return UA_STATUSCODE_BADOUTOFMEMORY; |
631 | 0 | *outChildrenSize = children; |
632 | | |
633 | | /* Copy the NodeIds */ |
634 | 0 | size_t pos = 0; |
635 | 0 | LIST_FOREACH(dsr, &rg->readers, listEntry) { |
636 | 0 | res |= UA_NodeId_copy(&dsr->head.identifier, (*outChildren) + pos); |
637 | 0 | pos++; |
638 | 0 | } |
639 | 0 | goto out; |
640 | 0 | } |
641 | | |
642 | | /* DataSetReader have no children (with a state machine) */ |
643 | 0 | LIST_FOREACH(dsr, &rg->readers, listEntry) { |
644 | 0 | if(UA_NodeId_equal(&componentId, &dsr->head.identifier)) |
645 | 0 | return UA_STATUSCODE_BADNOTSUPPORTED; |
646 | 0 | } |
647 | 0 | } |
648 | 0 | } |
649 | | |
650 | 0 | return UA_STATUSCODE_BADNOTFOUND; |
651 | | |
652 | 0 | out: |
653 | 0 | if(res != UA_STATUSCODE_GOOD) |
654 | 0 | UA_Array_delete(*outChildren, *outChildrenSize, &UA_TYPES[UA_TYPES_NODEID]); |
655 | 0 | return res; |
656 | 0 | } |
657 | | |
658 | | UA_StatusCode |
659 | | UA_Server_getPubSubComponentChildren(UA_Server *server, UA_NodeId componentId, |
660 | 0 | size_t *outChildrenSize, UA_NodeId **outChildren) { |
661 | 0 | if(!outChildrenSize || !outChildren) |
662 | 0 | return UA_STATUSCODE_BADINVALIDARGUMENT; |
663 | 0 | lockServer(server); |
664 | 0 | UA_PubSubManager *psm = getPSM(server); |
665 | 0 | UA_StatusCode res = (psm) ? |
666 | 0 | getPubSubComponentChildren(psm, componentId, |
667 | 0 | outChildrenSize, outChildren) : UA_STATUSCODE_BADINTERNALERROR; |
668 | 0 | unlockServer(server); |
669 | 0 | return res; |
670 | 0 | } |
671 | | |
672 | | void |
673 | 600 | UA_PubSubManager_setState(UA_PubSubManager *psm, UA_LifecycleState state) { |
674 | 600 | if(state == UA_LIFECYCLESTATE_STOPPED) |
675 | 300 | state = UA_LIFECYCLESTATE_STOPPING; |
676 | | |
677 | | /* Check if all connections are closed if we are started */ |
678 | 600 | if(state == UA_LIFECYCLESTATE_STOPPING) { |
679 | 300 | UA_PubSubConnection *c; |
680 | 300 | TAILQ_FOREACH(c, &psm->connections, listEntry) { |
681 | 0 | if(c->sendChannel != 0 || c->recvChannelsSize > 0) |
682 | 0 | goto set_state; |
683 | | |
684 | 0 | UA_WriterGroup *wg; |
685 | 0 | LIST_FOREACH(wg, &c->writerGroups, listEntry) { |
686 | 0 | if(wg->sendChannel > 0) |
687 | 0 | goto set_state; |
688 | 0 | } |
689 | | |
690 | 0 | UA_ReaderGroup *rg; |
691 | 0 | LIST_FOREACH(rg, &c->readerGroups, listEntry) { |
692 | 0 | if(rg->recvChannelsSize > 0) |
693 | 0 | goto set_state; |
694 | 0 | } |
695 | 0 | } |
696 | | |
697 | | /* No open connections -> stopped */ |
698 | 300 | state = UA_LIFECYCLESTATE_STOPPED; |
699 | 300 | } |
700 | | |
701 | 600 | set_state: |
702 | 600 | if(state == psm->sc.state) |
703 | 0 | return; |
704 | 600 | psm->sc.state = state; |
705 | 600 | if(psm->sc.notifyState) |
706 | 0 | psm->sc.notifyState(&psm->sc, state); |
707 | | |
708 | | /* When we just started, trigger all connections to go from PAUSED to |
709 | | * OPERATIONAL */ |
710 | 600 | if(state == UA_LIFECYCLESTATE_STARTED) { |
711 | 300 | UA_PubSubConnection *c; |
712 | 300 | TAILQ_FOREACH(c, &psm->connections, listEntry) { |
713 | 0 | UA_PubSubConnection_setPubSubState(psm, c, c->head.state); |
714 | 0 | } |
715 | 300 | } |
716 | 600 | } |
717 | | |
718 | | static UA_StatusCode |
719 | 300 | UA_PubSubManager_start(UA_ServerComponent *sc, UA_Server *server) { |
720 | 300 | UA_PubSubManager *psm = (UA_PubSubManager*)sc; |
721 | 300 | if(psm->sc.state == UA_LIFECYCLESTATE_STOPPING) { |
722 | 0 | UA_LOG_ERROR(psm->logging, UA_LOGCATEGORY_PUBSUB, |
723 | 0 | "The PubSubManager is still stopping"); |
724 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
725 | 0 | } |
726 | | |
727 | | /* Re-cache for the case that the configuration has been updated */ |
728 | 300 | psm->logging = server->config.logging; |
729 | | |
730 | 300 | UA_PubSubManager_setState(psm, UA_LIFECYCLESTATE_STARTED); |
731 | | |
732 | 300 | return UA_STATUSCODE_GOOD; |
733 | 300 | } |
734 | | |
735 | | static void |
736 | 300 | UA_PubSubManager_stop(UA_ServerComponent *sc) { |
737 | 300 | UA_PubSubManager *psm = (UA_PubSubManager*)sc; |
738 | 300 | disableAllPubSubComponents(psm); |
739 | 300 | UA_PubSubManager_setState(psm, UA_LIFECYCLESTATE_STOPPED); |
740 | 300 | } |
741 | | |
742 | | UA_StatusCode |
743 | 551 | UA_PubSubManager_clear(UA_PubSubManager *psm) { |
744 | 551 | if(psm->sc.state != UA_LIFECYCLESTATE_STOPPED) { |
745 | 0 | UA_LOG_ERROR(psm->logging, UA_LOGCATEGORY_PUBSUB, |
746 | 0 | "Cannot delete the PubSubManager because " |
747 | 0 | "it is not stopped"); |
748 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
749 | 0 | } |
750 | | |
751 | 551 | UA_LOCK_ASSERT(&psm->sc.server->serviceMutex); |
752 | | |
753 | | /* Remove Connections - this also remove WriterGroups and ReaderGroups */ |
754 | 551 | UA_PubSubConnection *c, *tmpC; |
755 | 551 | TAILQ_FOREACH_SAFE(c, &psm->connections, listEntry, tmpC) { |
756 | 0 | UA_PubSubConnection_delete(psm, c); |
757 | 0 | } |
758 | | |
759 | | /* Remove the DataSets */ |
760 | 551 | UA_PublishedDataSet *tmpPDS1, *tmpPDS2; |
761 | 551 | TAILQ_FOREACH_SAFE(tmpPDS1, &psm->publishedDataSets, listEntry, tmpPDS2) { |
762 | 0 | UA_PublishedDataSet_remove(psm, tmpPDS1); |
763 | 0 | } |
764 | | |
765 | | /* Remove the ReserveIds*/ |
766 | 551 | ZIP_ITER(UA_ReserveIdTree, &psm->reserveIds, removeReserveId, NULL); |
767 | 551 | psm->reserveIdsSize = 0; |
768 | | |
769 | | /* Delete subscribed datasets */ |
770 | 551 | UA_SubscribedDataSet *tmpSDS1, *tmpSDS2; |
771 | 551 | TAILQ_FOREACH_SAFE(tmpSDS1, &psm->subscribedDataSets, listEntry, tmpSDS2) { |
772 | 0 | UA_SubscribedDataSet_remove(psm, tmpSDS1); |
773 | 0 | } |
774 | | |
775 | | #ifdef UA_ENABLE_PUBSUB_SKS |
776 | | /* Remove the SecurityGroups */ |
777 | | UA_SecurityGroup *tmpSG1, *tmpSG2; |
778 | | TAILQ_FOREACH_SAFE(tmpSG1, &psm->securityGroups, listEntry, tmpSG2) { |
779 | | UA_SecurityGroup_remove(psm, tmpSG1); |
780 | | } |
781 | | |
782 | | /* Remove the keyStorages */ |
783 | | UA_PubSubKeyStorage *ks, *ksTmp; |
784 | | LIST_FOREACH_SAFE(ks, &psm->pubSubKeyList, keyStorageList, ksTmp) { |
785 | | UA_PubSubKeyStorage_delete(psm, ks); |
786 | | } |
787 | | #endif |
788 | | |
789 | 551 | return UA_STATUSCODE_GOOD; |
790 | 551 | } |
791 | | |
792 | | UA_ServerComponent * |
793 | 551 | UA_PubSubManager_new(UA_Server *server) { |
794 | 551 | UA_PubSubManager *psm = (UA_PubSubManager*)UA_calloc(1, sizeof(UA_PubSubManager)); |
795 | 551 | if(!psm) |
796 | 0 | return NULL; |
797 | | |
798 | 551 | psm->sc.server = server; |
799 | 551 | psm->sc.name = UA_STRING("pubsub"); |
800 | 551 | psm->sc.start = UA_PubSubManager_start; |
801 | 551 | psm->sc.stop = UA_PubSubManager_stop; |
802 | 551 | psm->sc.clear = (UA_StatusCode (*)(UA_ServerComponent *))UA_PubSubManager_clear; |
803 | | |
804 | | /* Set the logging shortcut */ |
805 | 551 | psm->logging = server->config.logging; |
806 | | |
807 | | /* TODO: Using the Mac address to generate the defaultPublisherId. |
808 | | * In the future, this can be retrieved from the Eventloop. */ |
809 | 551 | psm->defaultPublisherId = generateRandomUInt64(); |
810 | | |
811 | 551 | TAILQ_INIT(&psm->connections); |
812 | 551 | TAILQ_INIT(&psm->publishedDataSets); |
813 | 551 | TAILQ_INIT(&psm->subscribedDataSets); |
814 | | |
815 | | #ifdef UA_ENABLE_PUBSUB_SKS |
816 | | TAILQ_INIT(&psm->securityGroups); |
817 | | #endif |
818 | | |
819 | 551 | #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL |
820 | | /* Build PubSub information model */ |
821 | 551 | initPubSubNS0(server); |
822 | 551 | #endif |
823 | | |
824 | 551 | return &psm->sc; |
825 | 551 | } |
826 | | |
827 | | #endif /* UA_ENABLE_PUBSUB */ |