/src/open62541_15/src/pubsub/ua_pubsub_connection.c
Line | Count | Source |
1 | | /* This Source Code Form is subject to the terms of the Mozilla Public |
2 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
3 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
4 | | * |
5 | | * Copyright (c) 2017-2025 Fraunhofer IOSB (Author: Andreas Ebner) |
6 | | * Copyright (c) 2019, 2022, 2024 Fraunhofer IOSB (Author: Julius Pfrommer) |
7 | | * Copyright (c) 2019 Kalycito Infotech Private Limited |
8 | | * Copyright (c) 2021 Fraunhofer IOSB (Author: Jan Hermes) |
9 | | * Copyright (c) 2022 Siemens AG (Author: Thomas Fischer) |
10 | | * Copyright (c) 2022 Fraunhofer IOSB (Author: Noel Graf) |
11 | | */ |
12 | | |
13 | | #include "ua_pubsub_internal.h" |
14 | | |
15 | | #ifdef UA_ENABLE_PUBSUB /* conditional compilation */ |
16 | | |
17 | | static UA_Boolean |
18 | | UA_PubSubConnection_canConnect(UA_PubSubConnection *c); |
19 | | |
20 | | static UA_StatusCode |
21 | | UA_PubSubConnection_connect(UA_PubSubManager *psm, UA_PubSubConnection *c, |
22 | | UA_Boolean validate); |
23 | | |
24 | | static void |
25 | | UA_PubSubConnection_process(UA_PubSubManager *psm, UA_PubSubConnection *c, |
26 | | const UA_ByteString msg); |
27 | | |
28 | | static void |
29 | | UA_PubSubConnection_disconnect(UA_PubSubConnection *c); |
30 | | |
31 | | UA_StatusCode |
32 | | UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src, |
33 | 4.01k | UA_PubSubConnectionConfig *dst) { |
34 | 4.01k | UA_StatusCode res = UA_STATUSCODE_GOOD; |
35 | 4.01k | memcpy(dst, src, sizeof(UA_PubSubConnectionConfig)); |
36 | 4.01k | res |= UA_PublisherId_copy(&src->publisherId, &dst->publisherId); |
37 | 4.01k | res |= UA_String_copy(&src->name, &dst->name); |
38 | 4.01k | res |= UA_Variant_copy(&src->address, &dst->address); |
39 | 4.01k | res |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri); |
40 | 4.01k | res |= UA_Variant_copy(&src->connectionTransportSettings, |
41 | 4.01k | &dst->connectionTransportSettings); |
42 | 4.01k | res |= UA_KeyValueMap_copy(&src->connectionProperties, |
43 | 4.01k | &dst->connectionProperties); |
44 | 4.01k | if(res != UA_STATUSCODE_GOOD) |
45 | 0 | UA_PubSubConnectionConfig_clear(dst); |
46 | 4.01k | return res; |
47 | 4.01k | } |
48 | | |
49 | | UA_PubSubConnection * |
50 | 0 | UA_PubSubConnection_find(UA_PubSubManager *psm, const UA_NodeId id) { |
51 | 0 | if(!psm) |
52 | 0 | return NULL; |
53 | 0 | UA_PubSubConnection *c; |
54 | 0 | TAILQ_FOREACH(c, &psm->connections, listEntry) { |
55 | 0 | if(UA_NodeId_equal(&id, &c->head.identifier)) |
56 | 0 | break; |
57 | 0 | } |
58 | 0 | return c; |
59 | 0 | } |
60 | | |
61 | | void |
62 | 8.02k | UA_PubSubConnectionConfig_clear(UA_PubSubConnectionConfig *connectionConfig) { |
63 | 8.02k | UA_PublisherId_clear(&connectionConfig->publisherId); |
64 | 8.02k | UA_String_clear(&connectionConfig->name); |
65 | 8.02k | UA_String_clear(&connectionConfig->transportProfileUri); |
66 | 8.02k | UA_Variant_clear(&connectionConfig->connectionTransportSettings); |
67 | 8.02k | UA_Variant_clear(&connectionConfig->address); |
68 | 8.02k | UA_KeyValueMap_clear(&connectionConfig->connectionProperties); |
69 | 8.02k | } |
70 | | |
71 | | UA_StatusCode |
72 | | UA_PubSubConnection_create(UA_PubSubManager *psm, const UA_PubSubConnectionConfig *cc, |
73 | 0 | UA_NodeId *cId) { |
74 | | /* Allocate */ |
75 | 0 | UA_PubSubConnection *c = (UA_PubSubConnection*) |
76 | 0 | UA_calloc(1, sizeof(UA_PubSubConnection)); |
77 | 0 | if(!c) |
78 | 0 | return UA_STATUSCODE_BADOUTOFMEMORY; |
79 | | |
80 | 0 | c->head.componentType = UA_PUBSUBCOMPONENT_CONNECTION; |
81 | | |
82 | | /* Copy the connection config */ |
83 | 0 | UA_StatusCode ret = UA_PubSubConnectionConfig_copy(cc, &c->config); |
84 | 0 | UA_CHECK_STATUS(ret, UA_free(c); return ret); |
85 | | |
86 | | /* Assign the connection identifier */ |
87 | 0 | #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL |
88 | | /* Internally create a unique id */ |
89 | 0 | addPubSubConnectionRepresentation(psm->sc.server, c); |
90 | | #else |
91 | | /* Create a unique NodeId that does not correspond to a Node */ |
92 | | UA_PubSubManager_generateUniqueNodeId(psm, &c->head.identifier); |
93 | | #endif |
94 | | |
95 | | /* Register */ |
96 | 0 | TAILQ_INSERT_HEAD(&psm->connections, c, listEntry); |
97 | 0 | psm->connectionsSize++; |
98 | | |
99 | | /* Validate-connect to check the parameters */ |
100 | 0 | ret = UA_PubSubConnection_connect(psm, c, true); |
101 | 0 | if(ret != UA_STATUSCODE_GOOD) { |
102 | 0 | UA_LOG_ERROR(psm->logging, UA_LOGCATEGORY_PUBSUB, |
103 | 0 | "Could not create the PubSubConnection. " |
104 | 0 | "The connection parameters did not validate."); |
105 | 0 | UA_PubSubConnection_delete(psm, c); |
106 | 0 | return ret; |
107 | 0 | } |
108 | | |
109 | | /* Cache the log string */ |
110 | 0 | char tmpLogIdStr[128]; |
111 | 0 | mp_snprintf(tmpLogIdStr, 128, "PubSubConnection %N\t| ", c->head.identifier); |
112 | 0 | c->head.logIdString = UA_STRING_ALLOC(tmpLogIdStr); |
113 | | |
114 | | /* Notify the application that a new Connection was created. |
115 | | * This may internally adjust the config */ |
116 | 0 | UA_Server *server = psm->sc.server; |
117 | 0 | if(server->config.pubSubConfig.componentLifecycleCallback) { |
118 | 0 | UA_StatusCode res = server->config.pubSubConfig. |
119 | 0 | componentLifecycleCallback(server, c->head.identifier, |
120 | 0 | UA_PUBSUBCOMPONENT_CONNECTION, false); |
121 | 0 | if(res != UA_STATUSCODE_GOOD) { |
122 | 0 | UA_PubSubConnection_delete(psm, c); |
123 | 0 | return res; |
124 | 0 | } |
125 | 0 | } |
126 | | |
127 | 0 | UA_LOG_INFO_PUBSUB(psm->logging, c, "Connection created (State: %s)", |
128 | 0 | UA_PubSubState_name(c->head.state)); |
129 | | |
130 | | /* Copy the created NodeId to the output. Cannot fail as we create a |
131 | | * numerical NodeId. */ |
132 | 0 | if(cId) |
133 | 0 | UA_NodeId_copy(&c->head.identifier, cId); |
134 | | |
135 | | /* Enable the Connection immediately if the enabled flag is set */ |
136 | 0 | if(cc->enabled) |
137 | 0 | UA_PubSubConnection_setPubSubState(psm, c, UA_PUBSUBSTATE_OPERATIONAL); |
138 | |
|
139 | 0 | return UA_STATUSCODE_GOOD; |
140 | 0 | } |
141 | | |
142 | | static void |
143 | 0 | delayedPubSubConnection_delete(void *application, void *context) { |
144 | 0 | UA_PubSubManager *psm = (UA_PubSubManager*)application; |
145 | 0 | UA_Server *server = psm->sc.server; |
146 | 0 | UA_PubSubConnection *c = (UA_PubSubConnection*)context; |
147 | 0 | lockServer(server); |
148 | 0 | UA_PubSubConnection_delete(psm, c); |
149 | 0 | unlockServer(server); |
150 | 0 | } |
151 | | |
152 | | /* Clean up the PubSubConnection. If no EventLoop connection is attached we can |
153 | | * immediately free. Otherwise we close the EventLoop connections and free in |
154 | | * the connection callback. */ |
155 | | UA_StatusCode |
156 | 0 | UA_PubSubConnection_delete(UA_PubSubManager *psm, UA_PubSubConnection *c) { |
157 | 0 | UA_LOCK_ASSERT(&psm->sc.server->serviceMutex); |
158 | | |
159 | | /* Check with the application if we can remove */ |
160 | 0 | UA_Server *server = psm->sc.server; |
161 | 0 | if(server->config.pubSubConfig.componentLifecycleCallback) { |
162 | 0 | UA_StatusCode res = server->config.pubSubConfig. |
163 | 0 | componentLifecycleCallback(server, c->head.identifier, |
164 | 0 | UA_PUBSUBCOMPONENT_CONNECTION, true); |
165 | 0 | if(res != UA_STATUSCODE_GOOD) |
166 | 0 | return res; |
167 | 0 | } |
168 | | |
169 | | /* Disable (and disconnect) and set the deleteFlag. This prevents a |
170 | | * reconnect and triggers the deletion when the last open socket is |
171 | | * closed. */ |
172 | 0 | c->deleteFlag = true; |
173 | 0 | UA_PubSubConnection_setPubSubState(psm, c, UA_PUBSUBSTATE_DISABLED); |
174 | | |
175 | | /* Stop and all ReaderGroupds and WriterGroups attached to the Connection. |
176 | | * We need to disable all to remove the Connection.*/ |
177 | 0 | UA_ReaderGroup *rg, *tmpRg; |
178 | 0 | LIST_FOREACH(rg, &c->readerGroups, listEntry) { |
179 | 0 | UA_ReaderGroup_setPubSubState(psm, rg, UA_PUBSUBSTATE_DISABLED); |
180 | 0 | } |
181 | |
|
182 | 0 | UA_WriterGroup *wg, *tmpWg; |
183 | 0 | LIST_FOREACH(wg, &c->writerGroups, listEntry) { |
184 | 0 | UA_WriterGroup_setPubSubState(psm, wg, UA_PUBSUBSTATE_DISABLED); |
185 | 0 | } |
186 | | |
187 | | /* Remove all ReaderGorups and WriterGroups */ |
188 | 0 | LIST_FOREACH_SAFE(rg, &c->readerGroups, listEntry, tmpRg) { |
189 | 0 | UA_ReaderGroup_remove(psm, rg); |
190 | 0 | } |
191 | |
|
192 | 0 | LIST_FOREACH_SAFE(wg, &c->writerGroups, listEntry, tmpWg) { |
193 | 0 | UA_WriterGroup_remove(psm, wg); |
194 | 0 | } |
195 | | |
196 | | /* Not all sockets are closed. This method will be called again */ |
197 | 0 | if(c->sendChannel != 0 || c->recvChannelsSize > 0) |
198 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
199 | | |
200 | | /* The WriterGroups / ReaderGroups are not deleted. Try again in the next |
201 | | * iteration of the event loop.*/ |
202 | 0 | if(!LIST_EMPTY(&c->writerGroups) || !LIST_EMPTY(&c->readerGroups)) { |
203 | 0 | UA_EventLoop *el = psm->sc.server->config.eventLoop; |
204 | 0 | c->dc.callback = delayedPubSubConnection_delete; |
205 | 0 | c->dc.application = psm; |
206 | 0 | c->dc.context = c; |
207 | 0 | el->addDelayedCallback(el, &c->dc); |
208 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
209 | 0 | } |
210 | | |
211 | | /* Remove from the information model */ |
212 | 0 | #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL |
213 | 0 | deleteNode(psm->sc.server, c->head.identifier, true); |
214 | 0 | #endif |
215 | | |
216 | | /* Unlink from the server */ |
217 | 0 | TAILQ_REMOVE(&psm->connections, c, listEntry); |
218 | 0 | psm->connectionsSize--; |
219 | |
|
220 | 0 | UA_LOG_INFO_PUBSUB(psm->logging, c, "Connection deleted"); |
221 | |
|
222 | 0 | UA_PubSubConnectionConfig_clear(&c->config); |
223 | 0 | UA_PubSubComponentHead_clear(&c->head); |
224 | 0 | UA_free(c); |
225 | |
|
226 | 0 | return UA_STATUSCODE_GOOD; |
227 | 0 | } |
228 | | |
229 | | static void |
230 | | UA_PubSubConnection_process(UA_PubSubManager *psm, UA_PubSubConnection *c, |
231 | 0 | const UA_ByteString msg) { |
232 | 0 | UA_LOG_TRACE_PUBSUB(psm->logging, c, "Processing a received buffer"); |
233 | |
|
234 | | #ifdef UA_DEBUG_DUMP_PKGS |
235 | | UA_dump_hex_pkg(msg.data, msg.length); |
236 | | #endif |
237 | |
|
238 | 0 | UA_Boolean processed = false; |
239 | 0 | UA_NetworkMessage nm; |
240 | 0 | memset(&nm, 0, sizeof(UA_NetworkMessage)); |
241 | | |
242 | | /* Decode the NetworkMessage with the first matching ReaderGroup */ |
243 | 0 | UA_ReaderGroup *rg; |
244 | 0 | UA_StatusCode res = UA_STATUSCODE_BADNOTFOUND; |
245 | 0 | LIST_FOREACH(rg, &c->readerGroups, listEntry) { |
246 | 0 | if(rg->head.state != UA_PUBSUBSTATE_OPERATIONAL && |
247 | 0 | rg->head.state != UA_PUBSUBSTATE_PREOPERATIONAL) |
248 | 0 | continue; |
249 | 0 | if(rg->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP) { |
250 | 0 | res = UA_ReaderGroup_decodeNetworkMessage(psm, rg, msg, &nm); |
251 | 0 | } else { |
252 | 0 | #ifdef UA_ENABLE_JSON_ENCODING |
253 | 0 | res = UA_ReaderGroup_decodeNetworkMessageJSON(psm, rg, msg, &nm); |
254 | | #else |
255 | | res = UA_STATUSCODE_BADNOTSUPPORTED; |
256 | | UA_LOG_WARNING_PUBSUB(psm->logging, c, "JSON support is not activated"); |
257 | | #endif |
258 | 0 | } |
259 | 0 | if(res == UA_STATUSCODE_GOOD) |
260 | 0 | break; |
261 | 0 | } |
262 | |
|
263 | 0 | if(res != UA_STATUSCODE_GOOD) |
264 | 0 | goto finish; |
265 | | |
266 | | /* Process the received message for all ReaderGroups */ |
267 | 0 | LIST_FOREACH(rg, &c->readerGroups, listEntry) { |
268 | 0 | if(rg->head.state != UA_PUBSUBSTATE_OPERATIONAL && |
269 | 0 | rg->head.state != UA_PUBSUBSTATE_PREOPERATIONAL) |
270 | 0 | continue; |
271 | 0 | processed |= UA_ReaderGroup_process(psm, rg, &nm); |
272 | 0 | } |
273 | 0 | UA_NetworkMessage_clear(&nm); |
274 | |
|
275 | 0 | finish: |
276 | 0 | if(!processed) { |
277 | 0 | UA_DateTime nowM = UA_DateTime_nowMonotonic(); |
278 | 0 | if(c->silenceErrorUntil < nowM) { |
279 | 0 | UA_LOG_WARNING_PUBSUB(psm->logging, c, |
280 | 0 | "Message received that could not be processed " |
281 | 0 | "with StatusCode %s. Check PublisherId, " |
282 | 0 | "WriterGroupId and DatasetWriterId", |
283 | 0 | UA_StatusCode_name(res)); |
284 | 0 | c->silenceErrorUntil = nowM + (UA_DateTime)(10.0 * UA_DATETIME_SEC); |
285 | 0 | } |
286 | 0 | } |
287 | 0 | } |
288 | | |
289 | | UA_StatusCode |
290 | | UA_PubSubConnection_setPubSubState(UA_PubSubManager *psm, UA_PubSubConnection *c, |
291 | 0 | UA_PubSubState targetState) { |
292 | 0 | if(c->deleteFlag && targetState != UA_PUBSUBSTATE_DISABLED) { |
293 | 0 | UA_LOG_WARNING_PUBSUB(psm->logging, c, |
294 | 0 | "The connection is being deleted. Can only be disabled."); |
295 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
296 | 0 | } |
297 | | |
298 | | /* Callback to modify the WriterGroup config and change the targetState |
299 | | * before the state machine executes */ |
300 | 0 | UA_Server *server = psm->sc.server; |
301 | 0 | if(server->config.pubSubConfig.beforeStateChangeCallback) { |
302 | 0 | server->config.pubSubConfig. |
303 | 0 | beforeStateChangeCallback(server, c->head.identifier, &targetState); |
304 | 0 | } |
305 | | |
306 | | /* Are we doing a top-level state update or recursively? */ |
307 | 0 | UA_StatusCode ret = UA_STATUSCODE_GOOD; |
308 | 0 | UA_PubSubState oldState = c->head.state; |
309 | 0 | UA_Boolean isTransient = c->head.transientState; |
310 | 0 | c->head.transientState = true; |
311 | | |
312 | | /* Custom state machine */ |
313 | 0 | if(c->config.customStateMachine) { |
314 | 0 | ret = c->config.customStateMachine(server, c->head.identifier, c->config.context, |
315 | 0 | &c->head.state, targetState); |
316 | 0 | goto finalize_state_machine; |
317 | 0 | } |
318 | | |
319 | | /* Internal state machine */ |
320 | 0 | switch(targetState) { |
321 | | /* Disabled or Error */ |
322 | 0 | case UA_PUBSUBSTATE_ERROR: |
323 | 0 | case UA_PUBSUBSTATE_DISABLED: |
324 | 0 | UA_PubSubConnection_disconnect(c); |
325 | 0 | c->head.state = targetState; |
326 | 0 | break; |
327 | | |
328 | 0 | case UA_PUBSUBSTATE_PAUSED: |
329 | 0 | case UA_PUBSUBSTATE_PREOPERATIONAL: |
330 | 0 | case UA_PUBSUBSTATE_OPERATIONAL: |
331 | | /* Cannot go operational if the PubSubManager is not started */ |
332 | 0 | if(psm->sc.state != UA_LIFECYCLESTATE_STARTED) { |
333 | | /* Avoid repeat warnings */ |
334 | 0 | if(oldState != UA_PUBSUBSTATE_PAUSED) { |
335 | 0 | UA_LOG_WARNING_PUBSUB(psm->logging, c, |
336 | 0 | "Cannot enable the connection while the " |
337 | 0 | "server is not running -> Paused State"); |
338 | 0 | } |
339 | 0 | c->head.state = UA_PUBSUBSTATE_PAUSED; |
340 | 0 | UA_PubSubConnection_disconnect(c); |
341 | 0 | break; |
342 | 0 | } |
343 | | |
344 | 0 | c->head.state = UA_PUBSUBSTATE_OPERATIONAL; |
345 | | |
346 | | /* Whether the connection needs to connect depends on whether a |
347 | | * ReaderGroup or WriterGroup is attached. If not, then we don't |
348 | | * open any connections. */ |
349 | 0 | if(UA_PubSubConnection_canConnect(c)) |
350 | 0 | ret = UA_PubSubConnection_connect(psm, c, false); |
351 | 0 | break; |
352 | | |
353 | | /* Unknown case */ |
354 | 0 | default: |
355 | 0 | ret = UA_STATUSCODE_BADINTERNALERROR; |
356 | 0 | break; |
357 | 0 | } |
358 | | |
359 | | /* Failure */ |
360 | 0 | if(ret != UA_STATUSCODE_GOOD) { |
361 | 0 | c->head.state = UA_PUBSUBSTATE_ERROR; |
362 | 0 | UA_PubSubConnection_disconnect(c); |
363 | 0 | } |
364 | |
|
365 | 0 | finalize_state_machine: |
366 | | |
367 | | /* Only the top-level state update (if recursive calls are happening) |
368 | | * notifies the application and updates Reader and WriterGroups */ |
369 | 0 | c->head.transientState = isTransient; |
370 | 0 | if(c->head.transientState) |
371 | 0 | return ret; |
372 | | |
373 | | /* No state change has happened */ |
374 | 0 | if(c->head.state == oldState) |
375 | 0 | return ret; |
376 | | |
377 | 0 | UA_LOG_INFO_PUBSUB(psm->logging, c, "%s -> %s", |
378 | 0 | UA_PubSubState_name(oldState), |
379 | 0 | UA_PubSubState_name(c->head.state)); |
380 | | |
381 | | /* Inform application about state change */ |
382 | 0 | if(server->config.pubSubConfig.stateChangeCallback) { |
383 | 0 | server->config.pubSubConfig. |
384 | 0 | stateChangeCallback(server, c->head.identifier, c->head.state, ret); |
385 | 0 | } |
386 | | |
387 | | /* Children evaluate their state machine after the state change of the parent. |
388 | | * Keep the current child state as the target state for the child. */ |
389 | 0 | UA_ReaderGroup *rg; |
390 | 0 | LIST_FOREACH(rg, &c->readerGroups, listEntry) { |
391 | 0 | if(psm->pubSubInitialSetupMode && rg->config.enabled) { |
392 | 0 | UA_ReaderGroup_setPubSubState(psm, rg, UA_PUBSUBSTATE_OPERATIONAL); |
393 | 0 | } else { |
394 | 0 | UA_ReaderGroup_setPubSubState(psm, rg, rg->head.state); |
395 | 0 | } |
396 | 0 | } |
397 | 0 | UA_WriterGroup *wg; |
398 | 0 | LIST_FOREACH(wg, &c->writerGroups, listEntry) { |
399 | 0 | if(psm->pubSubInitialSetupMode && wg->config.enabled) { |
400 | 0 | UA_WriterGroup_setPubSubState(psm, wg, UA_PUBSUBSTATE_OPERATIONAL); |
401 | 0 | } else { |
402 | 0 | UA_WriterGroup_setPubSubState(psm, wg, wg->head.state); |
403 | 0 | } |
404 | 0 | } |
405 | | |
406 | | /* Update the PubSubManager state. It will go from STOPPING to STOPPED when |
407 | | * the last socket has closed. */ |
408 | 0 | UA_PubSubManager_setState(psm, psm->sc.state); |
409 | |
|
410 | 0 | return ret; |
411 | 0 | } |
412 | | |
413 | | static UA_StatusCode |
414 | 0 | enablePubSubConnection(UA_PubSubManager *psm, const UA_NodeId connectionId) { |
415 | 0 | UA_PubSubConnection *c = UA_PubSubConnection_find(psm, connectionId); |
416 | 0 | return (c) ? UA_PubSubConnection_setPubSubState(psm, c, UA_PUBSUBSTATE_OPERATIONAL) |
417 | 0 | : UA_STATUSCODE_BADNOTFOUND; |
418 | 0 | } |
419 | | |
420 | | static UA_StatusCode |
421 | 0 | disablePubSubConnection(UA_PubSubManager *psm, const UA_NodeId connectionId) { |
422 | 0 | UA_PubSubConnection *c = UA_PubSubConnection_find(psm, connectionId); |
423 | 0 | return (c) ? UA_PubSubConnection_setPubSubState(psm, c, UA_PUBSUBSTATE_DISABLED) |
424 | 0 | : UA_STATUSCODE_BADNOTFOUND; |
425 | 0 | } |
426 | | |
427 | | /***********************/ |
428 | | /* Connection Handling */ |
429 | | /***********************/ |
430 | | |
431 | | static UA_StatusCode |
432 | | UA_PubSubConnection_connectUDP(UA_PubSubManager *psm, UA_PubSubConnection *c, |
433 | | UA_Boolean validate); |
434 | | |
435 | | static UA_StatusCode |
436 | | UA_PubSubConnection_connectETH(UA_PubSubManager *psm, UA_PubSubConnection *c, |
437 | | UA_Boolean validate); |
438 | | |
439 | | typedef struct { |
440 | | UA_String profileURI; |
441 | | UA_String protocol; |
442 | | UA_Boolean json; |
443 | | UA_StatusCode (*connect)(UA_PubSubManager *psm, UA_PubSubConnection *c, |
444 | | UA_Boolean validate); |
445 | | } ConnectionProfileMapping; |
446 | | |
447 | | static ConnectionProfileMapping connectionProfiles[UA_PUBSUB_PROFILES_SIZE] = { |
448 | | {UA_STRING_STATIC("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp"), |
449 | | UA_STRING_STATIC("udp"), false, UA_PubSubConnection_connectUDP}, |
450 | | {UA_STRING_STATIC("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-uadp"), |
451 | | UA_STRING_STATIC("mqtt"), false, NULL}, |
452 | | {UA_STRING_STATIC("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-json"), |
453 | | UA_STRING_STATIC("mqtt"), true, NULL}, |
454 | | {UA_STRING_STATIC("http://opcfoundation.org/UA-Profile/Transport/pubsub-eth-uadp"), |
455 | | UA_STRING_STATIC("eth"), false, UA_PubSubConnection_connectETH} |
456 | | }; |
457 | | |
458 | | static void |
459 | | UA_PubSubConnection_detachConnection(UA_PubSubManager *psm, |
460 | | UA_ConnectionManager *cm, |
461 | | UA_PubSubConnection *c, |
462 | 0 | uintptr_t connectionId) { |
463 | 0 | if(c->sendChannel == connectionId) { |
464 | 0 | UA_LOG_INFO_PUBSUB(psm->logging, c, "Detach send-connection %S %u", |
465 | 0 | cm->protocol, (unsigned)connectionId); |
466 | 0 | c->sendChannel = 0; |
467 | 0 | return; |
468 | 0 | } |
469 | 0 | for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) { |
470 | 0 | if(c->recvChannels[i] != connectionId) |
471 | 0 | continue; |
472 | 0 | UA_LOG_INFO_PUBSUB(psm->logging, c, "Detach receive-connection %S %u", |
473 | 0 | cm->protocol, (unsigned)connectionId); |
474 | 0 | c->recvChannels[i] = 0; |
475 | 0 | c->recvChannelsSize--; |
476 | 0 | return; |
477 | 0 | } |
478 | 0 | } |
479 | | |
480 | | static UA_StatusCode |
481 | | UA_PubSubConnection_attachSendConnection(UA_PubSubManager *psm, |
482 | | UA_ConnectionManager *cm, |
483 | | UA_PubSubConnection *c, |
484 | 0 | uintptr_t connectionId) { |
485 | 0 | if(c->sendChannel != 0 && c->sendChannel != connectionId) |
486 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
487 | 0 | UA_LOG_INFO_PUBSUB(psm->logging, c, "Attach send-connection %S %u", |
488 | 0 | cm->protocol, (unsigned)connectionId); |
489 | 0 | c->sendChannel = connectionId; |
490 | 0 | return UA_STATUSCODE_GOOD; |
491 | 0 | } |
492 | | |
493 | | static UA_StatusCode |
494 | | UA_PubSubConnection_attachRecvConnection(UA_PubSubManager *psm, |
495 | | UA_ConnectionManager *cm, |
496 | | UA_PubSubConnection *c, |
497 | 0 | uintptr_t connectionId) { |
498 | 0 | for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) { |
499 | 0 | if(c->recvChannels[i] == connectionId) |
500 | 0 | return UA_STATUSCODE_GOOD; |
501 | 0 | } |
502 | 0 | if(c->recvChannelsSize >= UA_PUBSUB_MAXCHANNELS) |
503 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
504 | 0 | for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) { |
505 | 0 | if(c->recvChannels[i] != 0) |
506 | 0 | continue; |
507 | 0 | UA_LOG_INFO_PUBSUB(psm->logging, c, "Attach receive-connection %S %u", |
508 | 0 | cm->protocol, (unsigned)connectionId); |
509 | 0 | c->recvChannels[i] = connectionId; |
510 | 0 | c->recvChannelsSize++; |
511 | 0 | break; |
512 | 0 | } |
513 | 0 | return UA_STATUSCODE_GOOD; |
514 | 0 | } |
515 | | |
516 | | static void |
517 | 0 | UA_PubSubConnection_disconnect(UA_PubSubConnection *c) { |
518 | 0 | if(!c->cm) |
519 | 0 | return; |
520 | 0 | if(c->sendChannel != 0) |
521 | 0 | c->cm->closeConnection(c->cm, c->sendChannel); |
522 | 0 | for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) { |
523 | 0 | if(c->recvChannels[i] != 0) |
524 | 0 | c->cm->closeConnection(c->cm, c->recvChannels[i]); |
525 | 0 | } |
526 | 0 | } |
527 | | |
528 | | static void |
529 | | PubSubChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, |
530 | | void *application, void **connectionContext, |
531 | | UA_ConnectionState state, const UA_KeyValueMap *params, |
532 | 0 | UA_ByteString msg, UA_Boolean recv) { |
533 | 0 | if(!connectionContext) |
534 | 0 | return; |
535 | | |
536 | | /* Get the context pointers */ |
537 | 0 | UA_PubSubConnection *psc = (UA_PubSubConnection*)*connectionContext; |
538 | 0 | UA_PubSubManager *psm = (UA_PubSubManager*)application; |
539 | 0 | UA_Server *server = psm->sc.server; |
540 | |
|
541 | 0 | UA_LOG_TRACE_PUBSUB(psm->logging, psc, |
542 | 0 | "Connection Callback with state %i", state); |
543 | |
|
544 | 0 | lockServer(server); |
545 | | |
546 | | /* The connection is closing in the EventLoop. This is the last callback |
547 | | * from that connection. Clean up the SecureChannel in the client. */ |
548 | 0 | if(state == UA_CONNECTIONSTATE_CLOSING) { |
549 | | /* Reset the connection identifiers */ |
550 | 0 | UA_PubSubConnection_detachConnection(psm, cm, psc, connectionId); |
551 | | |
552 | | /* PSC marked for deletion and the last EventLoop connection has closed */ |
553 | 0 | if(psc->deleteFlag && psc->recvChannelsSize == 0 && psc->sendChannel == 0) { |
554 | 0 | UA_PubSubConnection_delete(psm, psc); |
555 | 0 | unlockServer(server); |
556 | 0 | return; |
557 | 0 | } |
558 | | |
559 | | /* Reconnect automatically if the connection was operational. This sets |
560 | | * the connection state if connecting fails. Attention! If there are |
561 | | * several send or recv channels, then the connection is only reopened if |
562 | | * all of them close - which is usually the case. */ |
563 | 0 | if(psc->head.state == UA_PUBSUBSTATE_OPERATIONAL) |
564 | 0 | UA_PubSubConnection_connect(psm, psc, false); |
565 | | |
566 | | /* Switch the psm state from stopping to stopped once the last |
567 | | * connection has closed */ |
568 | 0 | UA_PubSubManager_setState(psm, psm->sc.state); |
569 | |
|
570 | 0 | unlockServer(server); |
571 | 0 | return; |
572 | 0 | } |
573 | | |
574 | | /* Store the connectionId (if a new connection) */ |
575 | 0 | UA_StatusCode res = (recv) ? |
576 | 0 | UA_PubSubConnection_attachRecvConnection(psm, cm, psc, connectionId) : |
577 | 0 | UA_PubSubConnection_attachSendConnection(psm, cm, psc, connectionId); |
578 | 0 | if(res != UA_STATUSCODE_GOOD) { |
579 | 0 | UA_LOG_WARNING_PUBSUB(psm->logging, psc, |
580 | 0 | "No more space for an additional EventLoop connection"); |
581 | 0 | if(psc->cm) |
582 | 0 | psc->cm->closeConnection(psc->cm, connectionId); |
583 | 0 | unlockServer(server); |
584 | 0 | return; |
585 | 0 | } |
586 | | |
587 | | /* Connection open, set to operational if not already done */ |
588 | 0 | UA_PubSubConnection_setPubSubState(psm, psc, psc->head.state); |
589 | | |
590 | | /* Message received */ |
591 | 0 | if(UA_LIKELY(recv && msg.length > 0)) |
592 | 0 | UA_PubSubConnection_process(psm, psc, msg); |
593 | | |
594 | 0 | unlockServer(server); |
595 | 0 | } |
596 | | |
597 | | static void |
598 | | PubSubRecvChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, |
599 | | void *application, void **connectionContext, |
600 | | UA_ConnectionState state, const UA_KeyValueMap *params, |
601 | 0 | UA_ByteString msg) { |
602 | 0 | PubSubChannelCallback(cm, connectionId, application, connectionContext, |
603 | 0 | state, params, msg, true); |
604 | 0 | } |
605 | | |
606 | | static void |
607 | | PubSubSendChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, |
608 | | void *application, void **connectionContext, |
609 | | UA_ConnectionState state, const UA_KeyValueMap *params, |
610 | 0 | UA_ByteString msg) { |
611 | 0 | PubSubChannelCallback(cm, connectionId, application, connectionContext, |
612 | 0 | state, params, msg, false); |
613 | 0 | } |
614 | | |
615 | | static UA_StatusCode |
616 | | UA_PubSubConnection_connectUDP(UA_PubSubManager *psm, UA_PubSubConnection *c, |
617 | 0 | UA_Boolean validate) { |
618 | 0 | UA_LOCK_ASSERT(&psm->sc.server->serviceMutex); |
619 | |
|
620 | 0 | UA_NetworkAddressUrlDataType *addressUrl = (UA_NetworkAddressUrlDataType*) |
621 | 0 | c->config.address.data; |
622 | | |
623 | | /* Extract hostname and port */ |
624 | 0 | UA_String address; |
625 | 0 | UA_UInt16 port; |
626 | 0 | UA_StatusCode res = UA_parseEndpointUrl(&addressUrl->url, &address, &port, NULL); |
627 | 0 | if(res != UA_STATUSCODE_GOOD) { |
628 | 0 | UA_LOG_ERROR_PUBSUB(psm->logging, c, "Could not parse the UDP network URL"); |
629 | 0 | return res; |
630 | 0 | } |
631 | | |
632 | | /* Detect a wildcard address for unicast receiving. The individual |
633 | | * DataSetWriters then contain additional target hostnames for sending. |
634 | | * |
635 | | * "localhost" and the empty hostname are used as a special "receive all" |
636 | | * wildcard for PubSub UDP. All other addresses (also the 127.0.0/8 and ::1 |
637 | | * range) are handled differently. For them we only receive messages that |
638 | | * originate from these addresses. |
639 | | * |
640 | | * The EventLoop backend detects whether an address is multicast capable and |
641 | | * registers it for the multicast group in the background. */ |
642 | 0 | UA_String localhostAddr = UA_STRING_STATIC("localhost"); |
643 | 0 | UA_Boolean receive_all = |
644 | 0 | (address.length == 0) || UA_String_equal(&localhostAddr, &address); |
645 | | |
646 | | /* Set up the connection parameters */ |
647 | 0 | UA_Boolean listen = true; |
648 | 0 | UA_Boolean reuse = true; |
649 | 0 | UA_Boolean loopback = true; |
650 | 0 | UA_KeyValuePair kvp[7]; |
651 | 0 | UA_KeyValueMap kvm = {5, kvp}; |
652 | 0 | kvp[0].key = UA_QUALIFIEDNAME(0, "port"); |
653 | 0 | UA_Variant_setScalar(&kvp[0].value, &port, &UA_TYPES[UA_TYPES_UINT16]); |
654 | 0 | kvp[1].key = UA_QUALIFIEDNAME(0, "listen"); |
655 | 0 | UA_Variant_setScalar(&kvp[1].value, &listen, &UA_TYPES[UA_TYPES_BOOLEAN]); |
656 | 0 | kvp[2].key = UA_QUALIFIEDNAME(0, "validate"); |
657 | 0 | UA_Variant_setScalar(&kvp[2].value, &validate, &UA_TYPES[UA_TYPES_BOOLEAN]); |
658 | 0 | kvp[3].key = UA_QUALIFIEDNAME(0, "reuse"); |
659 | 0 | UA_Variant_setScalar(&kvp[3].value, &reuse, &UA_TYPES[UA_TYPES_BOOLEAN]); |
660 | 0 | kvp[4].key = UA_QUALIFIEDNAME(0, "loopback"); |
661 | 0 | UA_Variant_setScalar(&kvp[4].value, &loopback, &UA_TYPES[UA_TYPES_BOOLEAN]); |
662 | 0 | if(!receive_all) { |
663 | | /* The "receive all" wildcard is different in the eventloop UDP layer. |
664 | | * Omit the address entirely to receive all.*/ |
665 | 0 | kvp[5].key = UA_QUALIFIEDNAME(0, "address"); |
666 | 0 | UA_Variant_setScalar(&kvp[5].value, &address, &UA_TYPES[UA_TYPES_STRING]); |
667 | 0 | kvm.mapSize++; |
668 | 0 | } |
669 | 0 | if(!UA_String_isEmpty(&addressUrl->networkInterface)) { |
670 | 0 | kvp[kvm.mapSize].key = UA_QUALIFIEDNAME(0, "interface"); |
671 | 0 | UA_Variant_setScalar(&kvp[kvm.mapSize].value, &addressUrl->networkInterface, |
672 | 0 | &UA_TYPES[UA_TYPES_STRING]); |
673 | 0 | kvm.mapSize++; |
674 | 0 | } |
675 | | |
676 | | /* Open a recv connection */ |
677 | 0 | if(validate || (c->recvChannelsSize == 0 && c->readerGroupsSize > 0)) { |
678 | 0 | res = c->cm->openConnection(c->cm, &kvm, psm, c, PubSubRecvChannelCallback); |
679 | 0 | if(res != UA_STATUSCODE_GOOD) { |
680 | 0 | UA_LOG_ERROR_PUBSUB(psm->logging, c, |
681 | 0 | "Could not open an UDP channel for receiving"); |
682 | 0 | return res; |
683 | 0 | } |
684 | 0 | } |
685 | | |
686 | | /* Receive all -- sending is handled in the DataSetWriter */ |
687 | 0 | if(receive_all) { |
688 | 0 | UA_LOG_INFO_PUBSUB(psm->logging, c, |
689 | 0 | "Localhost address - don't open UDP send connection"); |
690 | 0 | return UA_STATUSCODE_GOOD; |
691 | 0 | } |
692 | | |
693 | | /* Open a send connection */ |
694 | 0 | if(validate || (c->sendChannel == 0 && c->writerGroupsSize > 0)) { |
695 | 0 | listen = false; |
696 | 0 | res = c->cm->openConnection(c->cm, &kvm, psm, c, PubSubSendChannelCallback); |
697 | 0 | if(res != UA_STATUSCODE_GOOD) { |
698 | 0 | UA_LOG_ERROR_PUBSUB(psm->logging, c, "Could not open an UDP recv channel"); |
699 | 0 | return res; |
700 | 0 | } |
701 | 0 | } |
702 | | |
703 | 0 | return UA_STATUSCODE_GOOD; |
704 | 0 | } |
705 | | |
706 | | static UA_StatusCode |
707 | | UA_PubSubConnection_connectETH(UA_PubSubManager *psm, UA_PubSubConnection *c, |
708 | 0 | UA_Boolean validate) { |
709 | 0 | UA_LOCK_ASSERT(&psm->sc.server->serviceMutex); |
710 | |
|
711 | 0 | UA_NetworkAddressUrlDataType *addressUrl = (UA_NetworkAddressUrlDataType*) |
712 | 0 | c->config.address.data; |
713 | | |
714 | | /* Extract hostname and port */ |
715 | 0 | UA_String address; |
716 | |
|
717 | 0 | UA_UInt16 vid = 0; |
718 | 0 | UA_Byte pcp = 0; |
719 | |
|
720 | 0 | UA_StatusCode res = UA_parseEndpointUrlEthernet(&addressUrl->url, &address, &vid, &pcp); |
721 | 0 | if(res != UA_STATUSCODE_GOOD) { |
722 | 0 | UA_LOG_ERROR_PUBSUB(psm->logging, c, "Could not parse the ETH network URL"); |
723 | 0 | return res; |
724 | 0 | } |
725 | | |
726 | 0 | UA_Boolean listen = true; |
727 | 0 | UA_KeyValuePair kvp[7]; |
728 | 0 | UA_KeyValueMap kvm = {7, kvp}; |
729 | | |
730 | 0 | kvp[0].key = UA_QUALIFIEDNAME(0, "address"); |
731 | 0 | UA_Variant_setScalar(&kvp[0].value, &address, &UA_TYPES[UA_TYPES_STRING]); |
732 | 0 | kvp[1].key = UA_QUALIFIEDNAME(0, "listen"); |
733 | 0 | UA_Variant_setScalar(&kvp[1].value, &listen, &UA_TYPES[UA_TYPES_BOOLEAN]); |
734 | 0 | kvp[2].key = UA_QUALIFIEDNAME(0, "interface"); |
735 | 0 | UA_Variant_setScalar(&kvp[2].value, &addressUrl->networkInterface, |
736 | 0 | &UA_TYPES[UA_TYPES_STRING]); |
737 | 0 | kvp[3].key = UA_QUALIFIEDNAME(0, "validate"); |
738 | 0 | UA_Variant_setScalar(&kvp[3].value, &validate, &UA_TYPES[UA_TYPES_BOOLEAN]); |
739 | 0 | UA_UInt16 ether_type = 0xB62C; |
740 | 0 | kvp[4].key = UA_QUALIFIEDNAME(0, "ethertype"); |
741 | 0 | UA_Variant_setScalar(&kvp[4].value, ðer_type, &UA_TYPES[UA_TYPES_UINT16]); |
742 | | |
743 | 0 | kvp[5].key = UA_QUALIFIEDNAME(0,"vid"); |
744 | 0 | UA_Variant_setScalar(&kvp[5].value, &vid, &UA_TYPES[UA_TYPES_UINT16]); |
745 | |
|
746 | 0 | kvp[6].key = UA_QUALIFIEDNAME(0,"pcp"); |
747 | 0 | UA_Variant_setScalar(&kvp[6].value, &pcp, &UA_TYPES[UA_TYPES_BYTE]); |
748 | | |
749 | | /* Open recv channels */ |
750 | 0 | if(validate || (c->recvChannelsSize == 0 && c->readerGroupsSize > 0)) { |
751 | 0 | res = c->cm->openConnection(c->cm, &kvm, psm, c, PubSubRecvChannelCallback); |
752 | 0 | if(res != UA_STATUSCODE_GOOD) { |
753 | 0 | UA_LOG_ERROR_PUBSUB(psm->logging, c, "Could not open an ETH recv channel"); |
754 | 0 | return res; |
755 | 0 | } |
756 | 0 | } |
757 | | |
758 | | /* Open send channels */ |
759 | 0 | if(validate || (c->sendChannel == 0 && c->writerGroupsSize > 0)) { |
760 | 0 | listen = false; |
761 | 0 | res = c->cm->openConnection(c->cm, &kvm, psm, c, PubSubSendChannelCallback); |
762 | 0 | if(res != UA_STATUSCODE_GOOD) { |
763 | 0 | UA_LOG_ERROR_PUBSUB(psm->logging, c, |
764 | 0 | "Could not open an ETH channel for sending"); |
765 | 0 | } |
766 | 0 | } |
767 | |
|
768 | 0 | return res; |
769 | 0 | } |
770 | | |
771 | | static UA_Boolean |
772 | 0 | UA_PubSubConnection_canConnect(UA_PubSubConnection *c) { |
773 | 0 | if(c->sendChannel == 0 && c->writerGroupsSize > 0) |
774 | 0 | return true; |
775 | 0 | if(c->recvChannelsSize == 0 && c->readerGroupsSize > 0) |
776 | 0 | return true; |
777 | 0 | return false; |
778 | 0 | } |
779 | | |
780 | | static UA_StatusCode |
781 | | UA_PubSubConnection_connect(UA_PubSubManager *psm, UA_PubSubConnection *c, |
782 | 0 | UA_Boolean validate) { |
783 | 0 | UA_LOCK_ASSERT(&psm->sc.server->serviceMutex); |
784 | |
|
785 | 0 | UA_EventLoop *el = psm->sc.server->config.eventLoop; |
786 | 0 | if(!el) { |
787 | 0 | UA_LOG_ERROR_PUBSUB(psm->logging, c, "No EventLoop configured"); |
788 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
789 | 0 | } |
790 | | |
791 | | /* Look up the connection manager for the connection */ |
792 | 0 | ConnectionProfileMapping *profile = NULL; |
793 | 0 | for(size_t i = 0; i < UA_PUBSUB_PROFILES_SIZE; i++) { |
794 | 0 | if(!UA_String_equal(&c->config.transportProfileUri, |
795 | 0 | &connectionProfiles[i].profileURI)) |
796 | 0 | continue; |
797 | 0 | profile = &connectionProfiles[i]; |
798 | 0 | break; |
799 | 0 | } |
800 | |
|
801 | 0 | UA_ConnectionManager *cm = (profile) ? getCM(el, profile->protocol) : NULL; |
802 | 0 | if(!cm || (c->cm && cm != c->cm)) { |
803 | 0 | UA_LOG_ERROR_PUBSUB(psm->logging, c, |
804 | 0 | "The requested profile \"%S\"is not supported", |
805 | 0 | c->config.transportProfileUri); |
806 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
807 | 0 | } |
808 | | |
809 | | /* Check the configuration address type */ |
810 | 0 | if(!UA_Variant_hasScalarType(&c->config.address, |
811 | 0 | &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE])) { |
812 | 0 | UA_LOG_ERROR_PUBSUB(psm->logging, c, "No NetworkAddressUrlDataType " |
813 | 0 | "for the address configuration"); |
814 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
815 | 0 | } |
816 | | |
817 | | /* Update the connection settings from the profile information */ |
818 | 0 | c->cm = cm; |
819 | 0 | c->json = profile->json; |
820 | | |
821 | | /* Some protocols (such as MQTT) don't connect at this level */ |
822 | 0 | return (profile->connect) ? |
823 | 0 | profile->connect(psm, c, validate) : UA_STATUSCODE_GOOD; |
824 | 0 | } |
825 | | |
826 | | /**************/ |
827 | | /* Server API */ |
828 | | /**************/ |
829 | | |
830 | | UA_StatusCode |
831 | | UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection, |
832 | 0 | UA_PubSubConnectionConfig *config) { |
833 | 0 | if(!server || !config) |
834 | 0 | return UA_STATUSCODE_BADINVALIDARGUMENT; |
835 | 0 | lockServer(server); |
836 | 0 | UA_PubSubConnection *c = UA_PubSubConnection_find(getPSM(server), connection); |
837 | 0 | UA_StatusCode res = (c) ? |
838 | 0 | UA_PubSubConnectionConfig_copy(&c->config, config) : UA_STATUSCODE_BADNOTFOUND; |
839 | 0 | unlockServer(server); |
840 | 0 | return res; |
841 | 0 | } |
842 | | |
843 | | UA_StatusCode |
844 | | UA_Server_addPubSubConnection(UA_Server *server, |
845 | | const UA_PubSubConnectionConfig *cc, |
846 | 0 | UA_NodeId *cId) { |
847 | 0 | if(!server || !cc) |
848 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
849 | 0 | lockServer(server); |
850 | 0 | UA_PubSubManager *psm = getPSM(server); |
851 | 0 | UA_StatusCode res = (psm) ? |
852 | 0 | UA_PubSubConnection_create(psm, cc, cId) : UA_STATUSCODE_BADINTERNALERROR; |
853 | 0 | unlockServer(server); |
854 | 0 | return res; |
855 | 0 | } |
856 | | |
857 | | UA_StatusCode |
858 | 0 | UA_Server_removePubSubConnection(UA_Server *server, const UA_NodeId cId) { |
859 | 0 | if(!server) |
860 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
861 | 0 | lockServer(server); |
862 | 0 | UA_PubSubManager *psm = getPSM(server); |
863 | 0 | UA_PubSubConnection *c = UA_PubSubConnection_find(psm, cId); |
864 | 0 | if(!c) { |
865 | 0 | unlockServer(server); |
866 | 0 | return UA_STATUSCODE_BADNOTFOUND; |
867 | 0 | } |
868 | 0 | UA_PubSubConnection_setPubSubState(psm, c, UA_PUBSUBSTATE_DISABLED); |
869 | 0 | UA_PubSubConnection_delete(psm, c); |
870 | 0 | unlockServer(server); |
871 | 0 | return UA_STATUSCODE_GOOD; |
872 | 0 | } |
873 | | |
874 | | UA_StatusCode |
875 | 0 | UA_Server_enablePubSubConnection(UA_Server *server, const UA_NodeId cId) { |
876 | 0 | if(!server) |
877 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
878 | 0 | lockServer(server); |
879 | 0 | UA_PubSubManager *psm = getPSM(server); |
880 | 0 | UA_StatusCode res = (psm) ? |
881 | 0 | enablePubSubConnection(psm, cId) : UA_STATUSCODE_BADINTERNALERROR; |
882 | 0 | unlockServer(server); |
883 | 0 | return res; |
884 | 0 | } |
885 | | |
886 | | UA_StatusCode |
887 | 0 | UA_Server_disablePubSubConnection(UA_Server *server, const UA_NodeId cId) { |
888 | 0 | if(!server) |
889 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
890 | 0 | lockServer(server); |
891 | 0 | UA_PubSubManager *psm = getPSM(server); |
892 | 0 | UA_StatusCode res = (psm) ? |
893 | 0 | disablePubSubConnection(psm, cId) : UA_STATUSCODE_BADINTERNALERROR; |
894 | 0 | unlockServer(server); |
895 | 0 | return res; |
896 | 0 | } |
897 | | |
898 | | UA_StatusCode |
899 | | UA_Server_processPubSubConnectionReceive(UA_Server *server, |
900 | | const UA_NodeId connectionId, |
901 | 0 | const UA_ByteString packet) { |
902 | 0 | if(!server) |
903 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
904 | 0 | lockServer(server); |
905 | 0 | UA_StatusCode res = UA_STATUSCODE_BADINTERNALERROR; |
906 | 0 | UA_PubSubManager *psm = getPSM(server); |
907 | 0 | if(psm) { |
908 | 0 | UA_PubSubConnection *c = UA_PubSubConnection_find(psm, connectionId); |
909 | 0 | if(c) { |
910 | 0 | res = UA_STATUSCODE_GOOD; |
911 | 0 | UA_PubSubConnection_process(psm, c, packet); |
912 | 0 | } else { |
913 | 0 | res = UA_STATUSCODE_BADNOTFOUND; |
914 | 0 | } |
915 | 0 | } |
916 | 0 | unlockServer(server); |
917 | 0 | return res; |
918 | 0 | } |
919 | | |
920 | | UA_StatusCode |
921 | | UA_Server_updatePubSubConnectionConfig(UA_Server *server, |
922 | | const UA_NodeId connectionId, |
923 | 0 | const UA_PubSubConnectionConfig *config) { |
924 | 0 | if(!server || !config) |
925 | 0 | return UA_STATUSCODE_BADINVALIDARGUMENT; |
926 | | |
927 | 0 | lockServer(server); |
928 | | |
929 | | /* Find the connection */ |
930 | 0 | UA_PubSubManager *psm = getPSM(server); |
931 | 0 | UA_PubSubConnection *c = UA_PubSubConnection_find(psm, connectionId); |
932 | 0 | if(!c) { |
933 | 0 | unlockServer(server); |
934 | 0 | return UA_STATUSCODE_BADNOTFOUND; |
935 | 0 | } |
936 | | |
937 | | /* Verify the connection is disabled */ |
938 | 0 | if(UA_PubSubState_isEnabled(c->head.state)) { |
939 | 0 | UA_LOG_ERROR_PUBSUB(psm->logging, c, |
940 | 0 | "The PubSubConnection must be disabled to update the config"); |
941 | 0 | unlockServer(server); |
942 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
943 | 0 | } |
944 | | |
945 | | /* Store the old config */ |
946 | 0 | UA_PubSubConnectionConfig oldConfig = c->config; |
947 | 0 | memset(&c->config, 0, sizeof(UA_PubSubConnectionConfig)); |
948 | | |
949 | | /* Copy the connection config */ |
950 | 0 | UA_StatusCode res = UA_PubSubConnectionConfig_copy(config, &c->config); |
951 | 0 | if(res != UA_STATUSCODE_GOOD) |
952 | 0 | goto errout; |
953 | | |
954 | | /* Validate-connect to check the parameters */ |
955 | 0 | res = UA_PubSubConnection_connect(psm, c, true); |
956 | 0 | if(res != UA_STATUSCODE_GOOD) { |
957 | 0 | UA_LOG_ERROR_PUBSUB(psm->logging, c, "The connection parameters did not validate"); |
958 | 0 | goto errout; |
959 | 0 | } |
960 | | |
961 | 0 | UA_PubSubConnectionConfig_clear(&oldConfig); |
962 | 0 | unlockServer(server); |
963 | 0 | return UA_STATUSCODE_GOOD; |
964 | | |
965 | 0 | errout: |
966 | | /* Restore the old config */ |
967 | 0 | UA_PubSubConnectionConfig_clear(&c->config); |
968 | 0 | c->config = oldConfig; |
969 | 0 | unlockServer(server); |
970 | 0 | return res; |
971 | 0 | } |
972 | | |
973 | | #endif /* UA_ENABLE_PUBSUB */ |