Coverage Report

Created: 2026-05-30 06:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/open62541_15/src/pubsub/ua_pubsub_connection.c
Line
Count
Source
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
 *
5
 * Copyright (c) 2017-2025 Fraunhofer IOSB (Author: Andreas Ebner)
6
 * Copyright (c) 2019, 2022, 2024 Fraunhofer IOSB (Author: Julius Pfrommer)
7
 * Copyright (c) 2019 Kalycito Infotech Private Limited
8
 * Copyright (c) 2021 Fraunhofer IOSB (Author: Jan Hermes)
9
 * Copyright (c) 2022 Siemens AG (Author: Thomas Fischer)
10
 * Copyright (c) 2022 Fraunhofer IOSB (Author: Noel Graf)
11
 */
12
13
#include "ua_pubsub_internal.h"
14
15
#ifdef UA_ENABLE_PUBSUB /* conditional compilation */
16
17
static UA_Boolean
18
UA_PubSubConnection_canConnect(UA_PubSubConnection *c);
19
20
static UA_StatusCode
21
UA_PubSubConnection_connect(UA_PubSubManager *psm, UA_PubSubConnection *c,
22
                            UA_Boolean validate);
23
24
static void
25
UA_PubSubConnection_process(UA_PubSubManager *psm, UA_PubSubConnection *c,
26
                            const UA_ByteString msg);
27
28
static void
29
UA_PubSubConnection_disconnect(UA_PubSubConnection *c);
30
31
UA_StatusCode
32
UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src,
33
4.01k
                               UA_PubSubConnectionConfig *dst) {
34
4.01k
    UA_StatusCode res = UA_STATUSCODE_GOOD;
35
4.01k
    memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
36
4.01k
    res |= UA_PublisherId_copy(&src->publisherId, &dst->publisherId);
37
4.01k
    res |= UA_String_copy(&src->name, &dst->name);
38
4.01k
    res |= UA_Variant_copy(&src->address, &dst->address);
39
4.01k
    res |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
40
4.01k
    res |= UA_Variant_copy(&src->connectionTransportSettings,
41
4.01k
                           &dst->connectionTransportSettings);
42
4.01k
    res |= UA_KeyValueMap_copy(&src->connectionProperties,
43
4.01k
                               &dst->connectionProperties);
44
4.01k
    if(res != UA_STATUSCODE_GOOD)
45
0
        UA_PubSubConnectionConfig_clear(dst);
46
4.01k
    return res;
47
4.01k
}
48
49
UA_PubSubConnection *
50
0
UA_PubSubConnection_find(UA_PubSubManager *psm, const UA_NodeId id) {
51
0
    if(!psm)
52
0
        return NULL;
53
0
    UA_PubSubConnection *c;
54
0
    TAILQ_FOREACH(c, &psm->connections, listEntry) {
55
0
        if(UA_NodeId_equal(&id, &c->head.identifier))
56
0
            break;
57
0
    }
58
0
    return c;
59
0
}
60
61
void
62
8.02k
UA_PubSubConnectionConfig_clear(UA_PubSubConnectionConfig *connectionConfig) {
63
8.02k
    UA_PublisherId_clear(&connectionConfig->publisherId);
64
8.02k
    UA_String_clear(&connectionConfig->name);
65
8.02k
    UA_String_clear(&connectionConfig->transportProfileUri);
66
8.02k
    UA_Variant_clear(&connectionConfig->connectionTransportSettings);
67
8.02k
    UA_Variant_clear(&connectionConfig->address);
68
8.02k
    UA_KeyValueMap_clear(&connectionConfig->connectionProperties);
69
8.02k
}
70
71
UA_StatusCode
72
UA_PubSubConnection_create(UA_PubSubManager *psm, const UA_PubSubConnectionConfig *cc,
73
0
                           UA_NodeId *cId) {
74
    /* Allocate */
75
0
    UA_PubSubConnection *c = (UA_PubSubConnection*)
76
0
        UA_calloc(1, sizeof(UA_PubSubConnection));
77
0
    if(!c)
78
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
79
80
0
    c->head.componentType = UA_PUBSUBCOMPONENT_CONNECTION;
81
82
    /* Copy the connection config */
83
0
    UA_StatusCode ret = UA_PubSubConnectionConfig_copy(cc, &c->config);
84
0
    UA_CHECK_STATUS(ret, UA_free(c); return ret);
85
86
    /* Assign the connection identifier */
87
0
#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
88
    /* Internally create a unique id */
89
0
    addPubSubConnectionRepresentation(psm->sc.server, c);
90
#else
91
    /* Create a unique NodeId that does not correspond to a Node */
92
    UA_PubSubManager_generateUniqueNodeId(psm, &c->head.identifier);
93
#endif
94
95
    /* Register */
96
0
    TAILQ_INSERT_HEAD(&psm->connections, c, listEntry);
97
0
    psm->connectionsSize++;
98
99
    /* Validate-connect to check the parameters */
100
0
    ret = UA_PubSubConnection_connect(psm, c, true);
101
0
    if(ret != UA_STATUSCODE_GOOD) {
102
0
        UA_LOG_ERROR(psm->logging, UA_LOGCATEGORY_PUBSUB,
103
0
                     "Could not create the PubSubConnection. "
104
0
                     "The connection parameters did not validate.");
105
0
        UA_PubSubConnection_delete(psm, c);
106
0
        return ret;
107
0
    }
108
109
    /* Cache the log string */
110
0
    char tmpLogIdStr[128];
111
0
    mp_snprintf(tmpLogIdStr, 128, "PubSubConnection %N\t| ", c->head.identifier);
112
0
    c->head.logIdString = UA_STRING_ALLOC(tmpLogIdStr);
113
114
    /* Notify the application that a new Connection was created.
115
     * This may internally adjust the config */
116
0
    UA_Server *server = psm->sc.server;
117
0
    if(server->config.pubSubConfig.componentLifecycleCallback) {
118
0
        UA_StatusCode res = server->config.pubSubConfig.
119
0
            componentLifecycleCallback(server, c->head.identifier,
120
0
                                       UA_PUBSUBCOMPONENT_CONNECTION, false);
121
0
        if(res != UA_STATUSCODE_GOOD) {
122
0
            UA_PubSubConnection_delete(psm, c);
123
0
            return res;
124
0
        }
125
0
    }
126
127
0
    UA_LOG_INFO_PUBSUB(psm->logging, c, "Connection created (State: %s)",
128
0
                       UA_PubSubState_name(c->head.state));
129
130
    /* Copy the created NodeId to the output. Cannot fail as we create a
131
     * numerical NodeId. */
132
0
    if(cId)
133
0
        UA_NodeId_copy(&c->head.identifier, cId);
134
135
    /* Enable the Connection immediately if the enabled flag is set */
136
0
    if(cc->enabled)
137
0
        UA_PubSubConnection_setPubSubState(psm, c, UA_PUBSUBSTATE_OPERATIONAL);
138
139
0
    return UA_STATUSCODE_GOOD;
140
0
}
141
142
static void
143
0
delayedPubSubConnection_delete(void *application, void *context) {
144
0
    UA_PubSubManager *psm = (UA_PubSubManager*)application;
145
0
    UA_Server *server = psm->sc.server;
146
0
    UA_PubSubConnection *c = (UA_PubSubConnection*)context;
147
0
    lockServer(server);
148
0
    UA_PubSubConnection_delete(psm, c);
149
0
    unlockServer(server);
150
0
}
151
152
/* Clean up the PubSubConnection. If no EventLoop connection is attached we can
153
 * immediately free. Otherwise we close the EventLoop connections and free in
154
 * the connection callback. */
155
UA_StatusCode
156
0
UA_PubSubConnection_delete(UA_PubSubManager *psm, UA_PubSubConnection *c) {
157
0
    UA_LOCK_ASSERT(&psm->sc.server->serviceMutex);
158
159
    /* Check with the application if we can remove */
160
0
    UA_Server *server = psm->sc.server;
161
0
    if(server->config.pubSubConfig.componentLifecycleCallback) {
162
0
        UA_StatusCode res = server->config.pubSubConfig.
163
0
            componentLifecycleCallback(server, c->head.identifier,
164
0
                                       UA_PUBSUBCOMPONENT_CONNECTION, true);
165
0
        if(res != UA_STATUSCODE_GOOD)
166
0
            return res;
167
0
    }
168
169
    /* Disable (and disconnect) and set the deleteFlag. This prevents a
170
     * reconnect and triggers the deletion when the last open socket is
171
     * closed. */
172
0
    c->deleteFlag = true;
173
0
    UA_PubSubConnection_setPubSubState(psm, c, UA_PUBSUBSTATE_DISABLED);
174
175
    /* Stop and all ReaderGroupds and WriterGroups attached to the Connection.
176
     * We need to disable all to remove the Connection.*/
177
0
    UA_ReaderGroup *rg, *tmpRg;
178
0
    LIST_FOREACH(rg, &c->readerGroups, listEntry) {
179
0
        UA_ReaderGroup_setPubSubState(psm, rg, UA_PUBSUBSTATE_DISABLED);
180
0
    }
181
182
0
    UA_WriterGroup *wg, *tmpWg;
183
0
    LIST_FOREACH(wg, &c->writerGroups, listEntry) {
184
0
        UA_WriterGroup_setPubSubState(psm, wg, UA_PUBSUBSTATE_DISABLED);
185
0
    }
186
187
    /* Remove all ReaderGorups and WriterGroups */
188
0
    LIST_FOREACH_SAFE(rg, &c->readerGroups, listEntry, tmpRg) {
189
0
        UA_ReaderGroup_remove(psm, rg);
190
0
    }
191
192
0
    LIST_FOREACH_SAFE(wg, &c->writerGroups, listEntry, tmpWg) {
193
0
        UA_WriterGroup_remove(psm, wg);
194
0
    }
195
196
    /* Not all sockets are closed. This method will be called again */
197
0
    if(c->sendChannel != 0 || c->recvChannelsSize > 0)
198
0
        return UA_STATUSCODE_BADINTERNALERROR;
199
200
    /* The WriterGroups / ReaderGroups are not deleted. Try again in the next
201
     * iteration of the event loop.*/
202
0
    if(!LIST_EMPTY(&c->writerGroups) || !LIST_EMPTY(&c->readerGroups)) {
203
0
        UA_EventLoop *el = psm->sc.server->config.eventLoop;
204
0
        c->dc.callback = delayedPubSubConnection_delete;
205
0
        c->dc.application = psm;
206
0
        c->dc.context = c;
207
0
        el->addDelayedCallback(el, &c->dc);
208
0
        return UA_STATUSCODE_BADINTERNALERROR;
209
0
    }
210
211
    /* Remove from the information model */
212
0
#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
213
0
    deleteNode(psm->sc.server, c->head.identifier, true);
214
0
#endif
215
216
    /* Unlink from the server */
217
0
    TAILQ_REMOVE(&psm->connections, c, listEntry);
218
0
    psm->connectionsSize--;
219
220
0
    UA_LOG_INFO_PUBSUB(psm->logging, c, "Connection deleted");
221
222
0
    UA_PubSubConnectionConfig_clear(&c->config);
223
0
    UA_PubSubComponentHead_clear(&c->head);
224
0
    UA_free(c);
225
226
0
    return UA_STATUSCODE_GOOD;
227
0
}
228
229
static void
230
UA_PubSubConnection_process(UA_PubSubManager *psm, UA_PubSubConnection *c,
231
0
                            const UA_ByteString msg) {
232
0
    UA_LOG_TRACE_PUBSUB(psm->logging, c, "Processing a received buffer");
233
234
#ifdef UA_DEBUG_DUMP_PKGS
235
    UA_dump_hex_pkg(msg.data, msg.length);
236
#endif
237
238
0
    UA_Boolean processed = false;
239
0
    UA_NetworkMessage nm;
240
0
    memset(&nm, 0, sizeof(UA_NetworkMessage));
241
242
    /* Decode the NetworkMessage with the first matching ReaderGroup */
243
0
    UA_ReaderGroup *rg;
244
0
    UA_StatusCode res = UA_STATUSCODE_BADNOTFOUND;
245
0
    LIST_FOREACH(rg, &c->readerGroups, listEntry) {
246
0
        if(rg->head.state != UA_PUBSUBSTATE_OPERATIONAL &&
247
0
           rg->head.state != UA_PUBSUBSTATE_PREOPERATIONAL)
248
0
            continue;
249
0
        if(rg->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP) {
250
0
            res = UA_ReaderGroup_decodeNetworkMessage(psm, rg, msg, &nm);
251
0
        } else {
252
0
#ifdef UA_ENABLE_JSON_ENCODING
253
0
            res = UA_ReaderGroup_decodeNetworkMessageJSON(psm, rg, msg, &nm);
254
#else
255
            res = UA_STATUSCODE_BADNOTSUPPORTED;
256
            UA_LOG_WARNING_PUBSUB(psm->logging, c, "JSON support is not activated");
257
#endif
258
0
        }
259
0
        if(res == UA_STATUSCODE_GOOD)
260
0
            break;
261
0
    }
262
263
0
    if(res != UA_STATUSCODE_GOOD)
264
0
        goto finish;
265
266
    /* Process the received message for all ReaderGroups */
267
0
    LIST_FOREACH(rg, &c->readerGroups, listEntry) {
268
0
        if(rg->head.state != UA_PUBSUBSTATE_OPERATIONAL &&
269
0
           rg->head.state != UA_PUBSUBSTATE_PREOPERATIONAL)
270
0
            continue;
271
0
        processed |= UA_ReaderGroup_process(psm, rg, &nm);
272
0
    }
273
0
    UA_NetworkMessage_clear(&nm);
274
275
0
 finish:
276
0
    if(!processed) {
277
0
        UA_DateTime nowM = UA_DateTime_nowMonotonic();
278
0
        if(c->silenceErrorUntil < nowM) {
279
0
            UA_LOG_WARNING_PUBSUB(psm->logging, c,
280
0
                                  "Message received that could not be processed "
281
0
                                  "with StatusCode %s. Check PublisherId, "
282
0
                                  "WriterGroupId and DatasetWriterId",
283
0
                                  UA_StatusCode_name(res));
284
0
            c->silenceErrorUntil = nowM + (UA_DateTime)(10.0 * UA_DATETIME_SEC);
285
0
        }
286
0
    }
287
0
}
288
289
UA_StatusCode
290
UA_PubSubConnection_setPubSubState(UA_PubSubManager *psm, UA_PubSubConnection *c,
291
0
                                   UA_PubSubState targetState) {
292
0
    if(c->deleteFlag && targetState != UA_PUBSUBSTATE_DISABLED) {
293
0
        UA_LOG_WARNING_PUBSUB(psm->logging, c,
294
0
                              "The connection is being deleted. Can only be disabled.");
295
0
        return UA_STATUSCODE_BADINTERNALERROR;
296
0
    }
297
298
    /* Callback to modify the WriterGroup config and change the targetState
299
     * before the state machine executes */
300
0
    UA_Server *server = psm->sc.server;
301
0
    if(server->config.pubSubConfig.beforeStateChangeCallback) {
302
0
        server->config.pubSubConfig.
303
0
            beforeStateChangeCallback(server, c->head.identifier, &targetState);
304
0
    }
305
306
    /* Are we doing a top-level state update or recursively? */
307
0
    UA_StatusCode ret = UA_STATUSCODE_GOOD;
308
0
    UA_PubSubState oldState = c->head.state;
309
0
    UA_Boolean isTransient = c->head.transientState;
310
0
    c->head.transientState = true;
311
312
    /* Custom state machine */
313
0
    if(c->config.customStateMachine) {
314
0
        ret = c->config.customStateMachine(server, c->head.identifier, c->config.context,
315
0
                                           &c->head.state, targetState);
316
0
        goto finalize_state_machine;
317
0
    }
318
319
    /* Internal state machine */
320
0
    switch(targetState) {
321
        /* Disabled or Error */
322
0
        case UA_PUBSUBSTATE_ERROR:
323
0
        case UA_PUBSUBSTATE_DISABLED:
324
0
            UA_PubSubConnection_disconnect(c);
325
0
            c->head.state = targetState;
326
0
            break;
327
328
0
        case UA_PUBSUBSTATE_PAUSED:
329
0
        case UA_PUBSUBSTATE_PREOPERATIONAL:
330
0
        case UA_PUBSUBSTATE_OPERATIONAL:
331
            /* Cannot go operational if the PubSubManager is not started */
332
0
            if(psm->sc.state != UA_LIFECYCLESTATE_STARTED) {
333
                /* Avoid repeat warnings */
334
0
                if(oldState != UA_PUBSUBSTATE_PAUSED) {
335
0
                    UA_LOG_WARNING_PUBSUB(psm->logging, c,
336
0
                                          "Cannot enable the connection while the "
337
0
                                          "server is not running -> Paused State");
338
0
                }
339
0
                c->head.state = UA_PUBSUBSTATE_PAUSED;
340
0
                UA_PubSubConnection_disconnect(c);
341
0
                break;
342
0
            }
343
344
0
            c->head.state = UA_PUBSUBSTATE_OPERATIONAL;
345
346
            /* Whether the connection needs to connect depends on whether a
347
             * ReaderGroup or WriterGroup is attached. If not, then we don't
348
             * open any connections. */
349
0
            if(UA_PubSubConnection_canConnect(c))
350
0
                ret = UA_PubSubConnection_connect(psm, c, false);
351
0
            break;
352
353
            /* Unknown case */
354
0
        default:
355
0
            ret = UA_STATUSCODE_BADINTERNALERROR;
356
0
            break;
357
0
    }
358
359
    /* Failure */
360
0
    if(ret != UA_STATUSCODE_GOOD) {
361
0
        c->head.state = UA_PUBSUBSTATE_ERROR;
362
0
        UA_PubSubConnection_disconnect(c);
363
0
    }
364
365
0
 finalize_state_machine:
366
367
    /* Only the top-level state update (if recursive calls are happening)
368
     * notifies the application and updates Reader and WriterGroups */
369
0
    c->head.transientState = isTransient;
370
0
    if(c->head.transientState)
371
0
        return ret;
372
373
    /* No state change has happened */
374
0
    if(c->head.state == oldState)
375
0
        return ret;
376
377
0
    UA_LOG_INFO_PUBSUB(psm->logging, c, "%s -> %s",
378
0
                       UA_PubSubState_name(oldState),
379
0
                       UA_PubSubState_name(c->head.state));
380
381
    /* Inform application about state change */
382
0
    if(server->config.pubSubConfig.stateChangeCallback) {
383
0
        server->config.pubSubConfig.
384
0
            stateChangeCallback(server, c->head.identifier, c->head.state, ret);
385
0
    }
386
387
    /* Children evaluate their state machine after the state change of the parent.
388
     * Keep the current child state as the target state for the child. */
389
0
    UA_ReaderGroup *rg;
390
0
    LIST_FOREACH(rg, &c->readerGroups, listEntry) {
391
0
        if(psm->pubSubInitialSetupMode && rg->config.enabled) {
392
0
            UA_ReaderGroup_setPubSubState(psm, rg, UA_PUBSUBSTATE_OPERATIONAL);
393
0
        } else {
394
0
            UA_ReaderGroup_setPubSubState(psm, rg, rg->head.state);
395
0
        }
396
0
    }
397
0
    UA_WriterGroup *wg;
398
0
    LIST_FOREACH(wg, &c->writerGroups, listEntry) {
399
0
        if(psm->pubSubInitialSetupMode && wg->config.enabled) {
400
0
            UA_WriterGroup_setPubSubState(psm, wg, UA_PUBSUBSTATE_OPERATIONAL);
401
0
        } else {
402
0
            UA_WriterGroup_setPubSubState(psm, wg, wg->head.state);
403
0
        }
404
0
    }
405
406
    /* Update the PubSubManager state. It will go from STOPPING to STOPPED when
407
     * the last socket has closed. */
408
0
    UA_PubSubManager_setState(psm, psm->sc.state);
409
410
0
    return ret;
411
0
}
412
413
static UA_StatusCode
414
0
enablePubSubConnection(UA_PubSubManager *psm, const UA_NodeId connectionId) {
415
0
    UA_PubSubConnection *c = UA_PubSubConnection_find(psm, connectionId);
416
0
    return (c) ? UA_PubSubConnection_setPubSubState(psm, c, UA_PUBSUBSTATE_OPERATIONAL)
417
0
        : UA_STATUSCODE_BADNOTFOUND;
418
0
}
419
420
static UA_StatusCode
421
0
disablePubSubConnection(UA_PubSubManager *psm, const UA_NodeId connectionId) {
422
0
    UA_PubSubConnection *c = UA_PubSubConnection_find(psm, connectionId);
423
0
    return (c) ? UA_PubSubConnection_setPubSubState(psm, c, UA_PUBSUBSTATE_DISABLED)
424
0
        : UA_STATUSCODE_BADNOTFOUND;
425
0
}
426
427
/***********************/
428
/* Connection Handling */
429
/***********************/
430
431
static UA_StatusCode
432
UA_PubSubConnection_connectUDP(UA_PubSubManager *psm, UA_PubSubConnection *c,
433
                               UA_Boolean validate);
434
435
static UA_StatusCode
436
UA_PubSubConnection_connectETH(UA_PubSubManager *psm, UA_PubSubConnection *c,
437
                               UA_Boolean validate);
438
439
typedef struct  {
440
    UA_String profileURI;
441
    UA_String protocol;
442
    UA_Boolean json;
443
    UA_StatusCode (*connect)(UA_PubSubManager *psm, UA_PubSubConnection *c,
444
                             UA_Boolean validate);
445
} ConnectionProfileMapping;
446
447
static ConnectionProfileMapping connectionProfiles[UA_PUBSUB_PROFILES_SIZE] = {
448
    {UA_STRING_STATIC("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp"),
449
     UA_STRING_STATIC("udp"), false, UA_PubSubConnection_connectUDP},
450
    {UA_STRING_STATIC("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-uadp"),
451
     UA_STRING_STATIC("mqtt"), false, NULL},
452
    {UA_STRING_STATIC("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-json"),
453
     UA_STRING_STATIC("mqtt"), true, NULL},
454
    {UA_STRING_STATIC("http://opcfoundation.org/UA-Profile/Transport/pubsub-eth-uadp"),
455
     UA_STRING_STATIC("eth"), false, UA_PubSubConnection_connectETH}
456
};
457
458
static void
459
UA_PubSubConnection_detachConnection(UA_PubSubManager *psm,
460
                                     UA_ConnectionManager *cm,
461
                                     UA_PubSubConnection *c,
462
0
                                     uintptr_t connectionId) {
463
0
    if(c->sendChannel == connectionId) {
464
0
        UA_LOG_INFO_PUBSUB(psm->logging, c, "Detach send-connection %S %u",
465
0
                           cm->protocol, (unsigned)connectionId);
466
0
        c->sendChannel = 0;
467
0
        return;
468
0
    }
469
0
    for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) {
470
0
        if(c->recvChannels[i] != connectionId)
471
0
            continue;
472
0
        UA_LOG_INFO_PUBSUB(psm->logging, c, "Detach receive-connection %S %u",
473
0
                           cm->protocol, (unsigned)connectionId);
474
0
        c->recvChannels[i] = 0;
475
0
        c->recvChannelsSize--;
476
0
        return;
477
0
    }
478
0
}
479
480
static UA_StatusCode
481
UA_PubSubConnection_attachSendConnection(UA_PubSubManager *psm,
482
                                         UA_ConnectionManager *cm,
483
                                         UA_PubSubConnection *c,
484
0
                                         uintptr_t connectionId) {
485
0
    if(c->sendChannel != 0 && c->sendChannel != connectionId)
486
0
        return UA_STATUSCODE_BADINTERNALERROR;
487
0
    UA_LOG_INFO_PUBSUB(psm->logging, c, "Attach send-connection %S %u",
488
0
                       cm->protocol, (unsigned)connectionId);
489
0
    c->sendChannel = connectionId;
490
0
    return UA_STATUSCODE_GOOD;
491
0
}
492
493
static UA_StatusCode
494
UA_PubSubConnection_attachRecvConnection(UA_PubSubManager *psm,
495
                                         UA_ConnectionManager *cm,
496
                                         UA_PubSubConnection *c,
497
0
                                         uintptr_t connectionId) {
498
0
    for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) {
499
0
        if(c->recvChannels[i] == connectionId)
500
0
            return UA_STATUSCODE_GOOD;
501
0
    }
502
0
    if(c->recvChannelsSize >= UA_PUBSUB_MAXCHANNELS)
503
0
        return UA_STATUSCODE_BADINTERNALERROR;
504
0
    for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) {
505
0
        if(c->recvChannels[i] != 0)
506
0
            continue;
507
0
        UA_LOG_INFO_PUBSUB(psm->logging, c, "Attach receive-connection %S %u",
508
0
                           cm->protocol, (unsigned)connectionId);
509
0
        c->recvChannels[i] = connectionId;
510
0
        c->recvChannelsSize++;
511
0
        break;
512
0
    }
513
0
    return UA_STATUSCODE_GOOD;
514
0
}
515
516
static void
517
0
UA_PubSubConnection_disconnect(UA_PubSubConnection *c) {   
518
0
    if(!c->cm)
519
0
        return;
520
0
    if(c->sendChannel != 0)
521
0
        c->cm->closeConnection(c->cm, c->sendChannel);
522
0
    for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) {
523
0
        if(c->recvChannels[i] != 0)
524
0
            c->cm->closeConnection(c->cm, c->recvChannels[i]);
525
0
    }
526
0
}
527
528
static void
529
PubSubChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId,
530
                      void *application, void **connectionContext,
531
                      UA_ConnectionState state, const UA_KeyValueMap *params,
532
0
                      UA_ByteString msg, UA_Boolean recv) {
533
0
    if(!connectionContext)
534
0
        return;
535
536
    /* Get the context pointers */
537
0
    UA_PubSubConnection *psc = (UA_PubSubConnection*)*connectionContext;
538
0
    UA_PubSubManager *psm = (UA_PubSubManager*)application;
539
0
    UA_Server *server = psm->sc.server;
540
541
0
    UA_LOG_TRACE_PUBSUB(psm->logging, psc,
542
0
                        "Connection Callback with state %i", state);
543
544
0
    lockServer(server);
545
546
    /* The connection is closing in the EventLoop. This is the last callback
547
     * from that connection. Clean up the SecureChannel in the client. */
548
0
    if(state == UA_CONNECTIONSTATE_CLOSING) {
549
        /* Reset the connection identifiers */
550
0
        UA_PubSubConnection_detachConnection(psm, cm, psc, connectionId);
551
552
        /* PSC marked for deletion and the last EventLoop connection has closed */
553
0
        if(psc->deleteFlag && psc->recvChannelsSize == 0 && psc->sendChannel == 0) {
554
0
            UA_PubSubConnection_delete(psm, psc);
555
0
            unlockServer(server);
556
0
            return;
557
0
        }
558
559
        /* Reconnect automatically if the connection was operational. This sets
560
         * the connection state if connecting fails. Attention! If there are
561
         * several send or recv channels, then the connection is only reopened if
562
         * all of them close - which is usually the case. */
563
0
        if(psc->head.state == UA_PUBSUBSTATE_OPERATIONAL)
564
0
            UA_PubSubConnection_connect(psm, psc, false);
565
566
        /* Switch the psm state from stopping to stopped once the last
567
         * connection has closed */
568
0
        UA_PubSubManager_setState(psm, psm->sc.state);
569
570
0
        unlockServer(server);
571
0
        return;
572
0
    }
573
574
    /* Store the connectionId (if a new connection) */
575
0
    UA_StatusCode res = (recv) ?
576
0
        UA_PubSubConnection_attachRecvConnection(psm, cm, psc, connectionId) :
577
0
        UA_PubSubConnection_attachSendConnection(psm, cm, psc, connectionId);
578
0
    if(res != UA_STATUSCODE_GOOD) {
579
0
        UA_LOG_WARNING_PUBSUB(psm->logging, psc,
580
0
                              "No more space for an additional EventLoop connection");
581
0
        if(psc->cm)
582
0
            psc->cm->closeConnection(psc->cm, connectionId);
583
0
        unlockServer(server);
584
0
        return;
585
0
    }
586
587
    /* Connection open, set to operational if not already done */
588
0
    UA_PubSubConnection_setPubSubState(psm, psc, psc->head.state);
589
590
    /* Message received */
591
0
    if(UA_LIKELY(recv && msg.length > 0))
592
0
        UA_PubSubConnection_process(psm, psc, msg);
593
    
594
0
    unlockServer(server);
595
0
}
596
597
static void
598
PubSubRecvChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId,
599
                         void *application, void **connectionContext,
600
                         UA_ConnectionState state, const UA_KeyValueMap *params,
601
0
                         UA_ByteString msg) {
602
0
    PubSubChannelCallback(cm, connectionId, application, connectionContext,
603
0
                         state, params, msg, true);
604
0
}
605
606
static void
607
PubSubSendChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId,
608
                         void *application, void **connectionContext,
609
                         UA_ConnectionState state, const UA_KeyValueMap *params,
610
0
                         UA_ByteString msg) {
611
0
    PubSubChannelCallback(cm, connectionId, application, connectionContext,
612
0
                         state, params, msg, false);
613
0
}
614
615
static UA_StatusCode
616
UA_PubSubConnection_connectUDP(UA_PubSubManager *psm, UA_PubSubConnection *c,
617
0
                               UA_Boolean validate) {
618
0
    UA_LOCK_ASSERT(&psm->sc.server->serviceMutex);
619
620
0
    UA_NetworkAddressUrlDataType *addressUrl = (UA_NetworkAddressUrlDataType*)
621
0
        c->config.address.data;
622
623
    /* Extract hostname and port */
624
0
    UA_String address;
625
0
    UA_UInt16 port;
626
0
    UA_StatusCode res = UA_parseEndpointUrl(&addressUrl->url, &address, &port, NULL);
627
0
    if(res != UA_STATUSCODE_GOOD) {
628
0
        UA_LOG_ERROR_PUBSUB(psm->logging, c, "Could not parse the UDP network URL");
629
0
        return res;
630
0
    }
631
632
    /* Detect a wildcard address for unicast receiving. The individual
633
     * DataSetWriters then contain additional target hostnames for sending.
634
     *
635
     * "localhost" and the empty hostname are used as a special "receive all"
636
     * wildcard for PubSub UDP. All other addresses (also the 127.0.0/8 and ::1
637
     * range) are handled differently. For them we only receive messages that
638
     * originate from these addresses.
639
     *
640
     * The EventLoop backend detects whether an address is multicast capable and
641
     * registers it for the multicast group in the background. */
642
0
    UA_String localhostAddr = UA_STRING_STATIC("localhost");
643
0
    UA_Boolean receive_all =
644
0
        (address.length == 0) || UA_String_equal(&localhostAddr, &address);
645
646
    /* Set up the connection parameters */
647
0
    UA_Boolean listen = true;
648
0
    UA_Boolean reuse = true;
649
0
    UA_Boolean loopback = true;
650
0
    UA_KeyValuePair kvp[7];
651
0
    UA_KeyValueMap kvm = {5, kvp};
652
0
    kvp[0].key = UA_QUALIFIEDNAME(0, "port");
653
0
    UA_Variant_setScalar(&kvp[0].value, &port, &UA_TYPES[UA_TYPES_UINT16]);
654
0
    kvp[1].key = UA_QUALIFIEDNAME(0, "listen");
655
0
    UA_Variant_setScalar(&kvp[1].value, &listen, &UA_TYPES[UA_TYPES_BOOLEAN]);
656
0
    kvp[2].key = UA_QUALIFIEDNAME(0, "validate");
657
0
    UA_Variant_setScalar(&kvp[2].value, &validate, &UA_TYPES[UA_TYPES_BOOLEAN]);
658
0
    kvp[3].key = UA_QUALIFIEDNAME(0, "reuse");
659
0
    UA_Variant_setScalar(&kvp[3].value, &reuse, &UA_TYPES[UA_TYPES_BOOLEAN]);
660
0
    kvp[4].key = UA_QUALIFIEDNAME(0, "loopback");
661
0
    UA_Variant_setScalar(&kvp[4].value, &loopback, &UA_TYPES[UA_TYPES_BOOLEAN]);
662
0
    if(!receive_all) {
663
        /* The "receive all" wildcard is different in the eventloop UDP layer.
664
         * Omit the address entirely to receive all.*/
665
0
        kvp[5].key = UA_QUALIFIEDNAME(0, "address");
666
0
        UA_Variant_setScalar(&kvp[5].value, &address, &UA_TYPES[UA_TYPES_STRING]);
667
0
        kvm.mapSize++;
668
0
    }
669
0
    if(!UA_String_isEmpty(&addressUrl->networkInterface)) {
670
0
        kvp[kvm.mapSize].key = UA_QUALIFIEDNAME(0, "interface");
671
0
        UA_Variant_setScalar(&kvp[kvm.mapSize].value, &addressUrl->networkInterface,
672
0
                             &UA_TYPES[UA_TYPES_STRING]);
673
0
        kvm.mapSize++;
674
0
    }
675
676
    /* Open a recv connection */
677
0
    if(validate || (c->recvChannelsSize == 0 && c->readerGroupsSize > 0)) {
678
0
        res = c->cm->openConnection(c->cm, &kvm, psm, c, PubSubRecvChannelCallback);
679
0
        if(res != UA_STATUSCODE_GOOD) {
680
0
            UA_LOG_ERROR_PUBSUB(psm->logging, c,
681
0
                                "Could not open an UDP channel for receiving");
682
0
            return res;
683
0
        }
684
0
    }
685
686
    /* Receive all -- sending is handled in the DataSetWriter */
687
0
    if(receive_all) {
688
0
        UA_LOG_INFO_PUBSUB(psm->logging, c,
689
0
                           "Localhost address - don't open UDP send connection");
690
0
        return UA_STATUSCODE_GOOD;
691
0
    }
692
693
    /* Open a send connection */
694
0
    if(validate || (c->sendChannel == 0 && c->writerGroupsSize > 0)) {
695
0
        listen = false;
696
0
        res = c->cm->openConnection(c->cm, &kvm, psm, c, PubSubSendChannelCallback);
697
0
        if(res != UA_STATUSCODE_GOOD) {
698
0
            UA_LOG_ERROR_PUBSUB(psm->logging, c, "Could not open an UDP recv channel");
699
0
            return res;
700
0
        }
701
0
    }
702
703
0
    return UA_STATUSCODE_GOOD;
704
0
}
705
706
static UA_StatusCode
707
UA_PubSubConnection_connectETH(UA_PubSubManager *psm, UA_PubSubConnection *c,
708
0
                               UA_Boolean validate) {
709
0
    UA_LOCK_ASSERT(&psm->sc.server->serviceMutex);
710
711
0
    UA_NetworkAddressUrlDataType *addressUrl = (UA_NetworkAddressUrlDataType*)
712
0
        c->config.address.data;
713
714
    /* Extract hostname and port */
715
0
    UA_String address;
716
717
0
    UA_UInt16 vid = 0;
718
0
    UA_Byte pcp = 0;
719
720
0
    UA_StatusCode res = UA_parseEndpointUrlEthernet(&addressUrl->url, &address, &vid, &pcp);
721
0
    if(res != UA_STATUSCODE_GOOD) {
722
0
        UA_LOG_ERROR_PUBSUB(psm->logging, c, "Could not parse the ETH network URL");
723
0
        return res;
724
0
    }
725
726
0
    UA_Boolean listen = true;
727
0
    UA_KeyValuePair kvp[7];
728
0
    UA_KeyValueMap kvm = {7, kvp};
729
    
730
0
    kvp[0].key = UA_QUALIFIEDNAME(0, "address");
731
0
    UA_Variant_setScalar(&kvp[0].value, &address, &UA_TYPES[UA_TYPES_STRING]);
732
0
    kvp[1].key = UA_QUALIFIEDNAME(0, "listen");
733
0
    UA_Variant_setScalar(&kvp[1].value, &listen, &UA_TYPES[UA_TYPES_BOOLEAN]);
734
0
    kvp[2].key = UA_QUALIFIEDNAME(0, "interface");
735
0
    UA_Variant_setScalar(&kvp[2].value, &addressUrl->networkInterface,
736
0
                         &UA_TYPES[UA_TYPES_STRING]);
737
0
    kvp[3].key = UA_QUALIFIEDNAME(0, "validate");
738
0
    UA_Variant_setScalar(&kvp[3].value, &validate, &UA_TYPES[UA_TYPES_BOOLEAN]);
739
0
    UA_UInt16 ether_type = 0xB62C;
740
0
    kvp[4].key = UA_QUALIFIEDNAME(0, "ethertype");
741
0
    UA_Variant_setScalar(&kvp[4].value, &ether_type, &UA_TYPES[UA_TYPES_UINT16]);
742
    
743
0
    kvp[5].key = UA_QUALIFIEDNAME(0,"vid");
744
0
    UA_Variant_setScalar(&kvp[5].value, &vid, &UA_TYPES[UA_TYPES_UINT16]);
745
746
0
    kvp[6].key = UA_QUALIFIEDNAME(0,"pcp");
747
0
    UA_Variant_setScalar(&kvp[6].value, &pcp, &UA_TYPES[UA_TYPES_BYTE]);
748
749
    /* Open recv channels */
750
0
    if(validate || (c->recvChannelsSize == 0 && c->readerGroupsSize > 0)) {
751
0
        res = c->cm->openConnection(c->cm, &kvm, psm, c, PubSubRecvChannelCallback);
752
0
        if(res != UA_STATUSCODE_GOOD) {
753
0
            UA_LOG_ERROR_PUBSUB(psm->logging, c, "Could not open an ETH recv channel");
754
0
            return res;
755
0
        }
756
0
    }
757
758
    /* Open send channels */
759
0
    if(validate || (c->sendChannel == 0 && c->writerGroupsSize > 0)) {
760
0
        listen = false;
761
0
        res = c->cm->openConnection(c->cm, &kvm, psm, c, PubSubSendChannelCallback);
762
0
        if(res != UA_STATUSCODE_GOOD) {
763
0
            UA_LOG_ERROR_PUBSUB(psm->logging, c,
764
0
                                "Could not open an ETH channel for sending");
765
0
        }
766
0
    }
767
768
0
    return res;
769
0
}
770
771
static UA_Boolean
772
0
UA_PubSubConnection_canConnect(UA_PubSubConnection *c) {
773
0
    if(c->sendChannel == 0 && c->writerGroupsSize > 0)
774
0
        return true;
775
0
    if(c->recvChannelsSize == 0 && c->readerGroupsSize > 0)
776
0
        return true;
777
0
    return false;
778
0
}
779
780
static UA_StatusCode
781
UA_PubSubConnection_connect(UA_PubSubManager *psm, UA_PubSubConnection *c,
782
0
                            UA_Boolean validate) {
783
0
    UA_LOCK_ASSERT(&psm->sc.server->serviceMutex);
784
785
0
    UA_EventLoop *el = psm->sc.server->config.eventLoop;
786
0
    if(!el) {
787
0
        UA_LOG_ERROR_PUBSUB(psm->logging, c, "No EventLoop configured");
788
0
        return UA_STATUSCODE_BADINTERNALERROR;
789
0
    }
790
791
    /* Look up the connection manager for the connection */
792
0
    ConnectionProfileMapping *profile = NULL;
793
0
    for(size_t i = 0; i < UA_PUBSUB_PROFILES_SIZE; i++) {
794
0
        if(!UA_String_equal(&c->config.transportProfileUri,
795
0
                            &connectionProfiles[i].profileURI))
796
0
            continue;
797
0
        profile = &connectionProfiles[i];
798
0
        break;
799
0
    }
800
801
0
    UA_ConnectionManager *cm = (profile) ? getCM(el, profile->protocol) : NULL;
802
0
    if(!cm || (c->cm && cm != c->cm)) {
803
0
        UA_LOG_ERROR_PUBSUB(psm->logging, c,
804
0
                            "The requested profile \"%S\"is not supported",
805
0
                            c->config.transportProfileUri);
806
0
        return UA_STATUSCODE_BADINTERNALERROR;
807
0
    }
808
809
    /* Check the configuration address type */
810
0
    if(!UA_Variant_hasScalarType(&c->config.address,
811
0
                                 &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE])) {
812
0
        UA_LOG_ERROR_PUBSUB(psm->logging, c, "No NetworkAddressUrlDataType "
813
0
                            "for the address configuration");
814
0
        return UA_STATUSCODE_BADINTERNALERROR;
815
0
    }
816
817
    /* Update the connection settings from the profile information */
818
0
    c->cm = cm;
819
0
    c->json = profile->json;
820
821
    /* Some protocols (such as MQTT) don't connect at this level */
822
0
    return (profile->connect) ?
823
0
        profile->connect(psm, c, validate) : UA_STATUSCODE_GOOD;
824
0
}
825
826
/**************/
827
/* Server API */
828
/**************/
829
830
UA_StatusCode
831
UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection,
832
0
                                    UA_PubSubConnectionConfig *config) {
833
0
    if(!server || !config)
834
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
835
0
    lockServer(server);
836
0
    UA_PubSubConnection *c = UA_PubSubConnection_find(getPSM(server), connection);
837
0
    UA_StatusCode res = (c) ?
838
0
        UA_PubSubConnectionConfig_copy(&c->config, config) : UA_STATUSCODE_BADNOTFOUND;
839
0
    unlockServer(server);
840
0
    return res;
841
0
}
842
843
UA_StatusCode
844
UA_Server_addPubSubConnection(UA_Server *server,
845
                              const UA_PubSubConnectionConfig *cc,
846
0
                              UA_NodeId *cId) {
847
0
    if(!server || !cc)
848
0
        return UA_STATUSCODE_BADINTERNALERROR;
849
0
    lockServer(server);
850
0
    UA_PubSubManager *psm = getPSM(server);
851
0
    UA_StatusCode res = (psm) ?
852
0
        UA_PubSubConnection_create(psm, cc, cId) : UA_STATUSCODE_BADINTERNALERROR;
853
0
    unlockServer(server);
854
0
    return res;
855
0
}
856
857
UA_StatusCode
858
0
UA_Server_removePubSubConnection(UA_Server *server, const UA_NodeId cId) {
859
0
    if(!server)
860
0
        return UA_STATUSCODE_BADINTERNALERROR;
861
0
    lockServer(server);
862
0
    UA_PubSubManager *psm = getPSM(server);
863
0
    UA_PubSubConnection *c = UA_PubSubConnection_find(psm, cId);
864
0
    if(!c) {
865
0
        unlockServer(server);
866
0
        return UA_STATUSCODE_BADNOTFOUND;
867
0
    }
868
0
    UA_PubSubConnection_setPubSubState(psm, c, UA_PUBSUBSTATE_DISABLED);
869
0
    UA_PubSubConnection_delete(psm, c);
870
0
    unlockServer(server);
871
0
    return UA_STATUSCODE_GOOD;
872
0
}
873
874
UA_StatusCode
875
0
UA_Server_enablePubSubConnection(UA_Server *server, const UA_NodeId cId) {
876
0
    if(!server)
877
0
        return UA_STATUSCODE_BADINTERNALERROR;
878
0
    lockServer(server);
879
0
    UA_PubSubManager *psm = getPSM(server);
880
0
    UA_StatusCode res = (psm) ?
881
0
        enablePubSubConnection(psm, cId) : UA_STATUSCODE_BADINTERNALERROR;
882
0
    unlockServer(server);
883
0
    return res;
884
0
}
885
886
UA_StatusCode
887
0
UA_Server_disablePubSubConnection(UA_Server *server, const UA_NodeId cId) {
888
0
    if(!server)
889
0
        return UA_STATUSCODE_BADINTERNALERROR;
890
0
    lockServer(server);
891
0
    UA_PubSubManager *psm = getPSM(server);
892
0
    UA_StatusCode res = (psm) ?
893
0
        disablePubSubConnection(psm, cId) : UA_STATUSCODE_BADINTERNALERROR;
894
0
    unlockServer(server);
895
0
    return res;
896
0
}
897
898
UA_StatusCode
899
UA_Server_processPubSubConnectionReceive(UA_Server *server,
900
                                         const UA_NodeId connectionId,
901
0
                                         const UA_ByteString packet) {
902
0
    if(!server)
903
0
        return UA_STATUSCODE_BADINTERNALERROR;
904
0
    lockServer(server);
905
0
    UA_StatusCode res = UA_STATUSCODE_BADINTERNALERROR;
906
0
    UA_PubSubManager *psm = getPSM(server);
907
0
    if(psm) {
908
0
        UA_PubSubConnection *c = UA_PubSubConnection_find(psm, connectionId);
909
0
        if(c) {
910
0
            res = UA_STATUSCODE_GOOD;
911
0
            UA_PubSubConnection_process(psm, c, packet);
912
0
        } else {
913
0
            res = UA_STATUSCODE_BADNOTFOUND;
914
0
        }
915
0
    }
916
0
    unlockServer(server);
917
0
    return res;
918
0
}
919
920
UA_StatusCode
921
UA_Server_updatePubSubConnectionConfig(UA_Server *server,
922
                                       const UA_NodeId connectionId,
923
0
                                       const UA_PubSubConnectionConfig *config) {
924
0
    if(!server || !config)
925
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
926
927
0
    lockServer(server);
928
929
    /* Find the connection */
930
0
    UA_PubSubManager *psm = getPSM(server);
931
0
    UA_PubSubConnection *c = UA_PubSubConnection_find(psm, connectionId);
932
0
    if(!c) {
933
0
        unlockServer(server);
934
0
        return UA_STATUSCODE_BADNOTFOUND;
935
0
    }
936
937
    /* Verify the connection is disabled */
938
0
    if(UA_PubSubState_isEnabled(c->head.state)) {
939
0
        UA_LOG_ERROR_PUBSUB(psm->logging, c,
940
0
                            "The PubSubConnection must be disabled to update the config");
941
0
        unlockServer(server);
942
0
        return UA_STATUSCODE_BADINTERNALERROR;
943
0
    }
944
945
    /* Store the old config */
946
0
    UA_PubSubConnectionConfig oldConfig = c->config;
947
0
    memset(&c->config, 0, sizeof(UA_PubSubConnectionConfig));
948
949
    /* Copy the connection config */
950
0
    UA_StatusCode res = UA_PubSubConnectionConfig_copy(config, &c->config);
951
0
    if(res != UA_STATUSCODE_GOOD)
952
0
        goto errout;
953
954
    /* Validate-connect to check the parameters */
955
0
    res = UA_PubSubConnection_connect(psm, c, true);
956
0
    if(res != UA_STATUSCODE_GOOD) {
957
0
        UA_LOG_ERROR_PUBSUB(psm->logging, c, "The connection parameters did not validate");
958
0
        goto errout;
959
0
    }
960
961
0
    UA_PubSubConnectionConfig_clear(&oldConfig);
962
0
    unlockServer(server);
963
0
    return UA_STATUSCODE_GOOD;
964
965
0
 errout:
966
    /* Restore the old config */
967
0
    UA_PubSubConnectionConfig_clear(&c->config);
968
0
    c->config = oldConfig;
969
0
    unlockServer(server);
970
0
    return res;
971
0
}
972
973
#endif /* UA_ENABLE_PUBSUB */