Coverage Report

Created: 2025-08-26 06:48

/src/mosquitto/src/bridge.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
Copyright (c) 2009-2021 Roger Light <roger@atchoo.org>
3
4
All rights reserved. This program and the accompanying materials
5
are made available under the terms of the Eclipse Public License 2.0
6
and Eclipse Distribution License v1.0 which accompany this distribution.
7
8
The Eclipse Public License is available at
9
   https://www.eclipse.org/legal/epl-2.0/
10
and the Eclipse Distribution License is available at
11
  http://www.eclipse.org/org/documents/edl-v10.php.
12
13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15
Contributors:
16
   Roger Light - initial implementation and documentation.
17
*/
18
19
#include "config.h"
20
21
#include <assert.h>
22
#include <errno.h>
23
#include <stdio.h>
24
#include <string.h>
25
26
#ifndef WIN32
27
#include <netdb.h>
28
#include <sys/socket.h>
29
#include <netinet/in.h>
30
#include <netinet/tcp.h>
31
#else
32
#include <winsock2.h>
33
#include <ws2tcpip.h>
34
#endif
35
36
#ifndef WIN32
37
#include <unistd.h>
38
#else
39
#include <process.h>
40
#include <winsock2.h>
41
#include <ws2tcpip.h>
42
#endif
43
44
#include "mosquitto.h"
45
#include "mosquitto_broker_internal.h"
46
#include "mosquitto_internal.h"
47
#include "net_mosq.h"
48
#include "packet_mosq.h"
49
#include "property_common.h"
50
#include "send_mosq.h"
51
#include "sys_tree.h"
52
#include "tls_mosq.h"
53
#include "util_mosq.h"
54
#include "will_mosq.h"
55
#include "utlist.h"
56
57
#ifdef WITH_BRIDGE
58
59
static void bridge__update_backoff(struct mosquitto__bridge *bridge);
60
#if defined(__GLIBC__) && defined(WITH_ADNS)
61
static int bridge__connect_step1(struct mosquitto *context);
62
static int bridge__connect_step2(struct mosquitto *context);
63
#endif
64
static void bridge__packet_cleanup(struct mosquitto *context);
65
66
static struct mosquitto *bridge__new(struct mosquitto__bridge *bridge)
67
0
{
68
0
  struct mosquitto *new_context = NULL;
69
0
  struct mosquitto **bridges;
70
0
  char *local_id;
71
72
0
  assert(bridge);
73
74
0
  local_id = mosquitto_strdup(bridge->local_clientid);
75
0
  if(!local_id){
76
0
    return NULL;
77
0
  }
78
79
0
  HASH_FIND(hh_id, db.contexts_by_id, local_id, strlen(local_id), new_context);
80
0
  if(new_context){
81
    /* (possible from persistent db) */
82
0
    mosquitto_FREE(local_id);
83
0
  }else{
84
    /* id wasn't found, so generate a new context */
85
0
    new_context = context__init();
86
0
    if(!new_context){
87
0
      mosquitto_FREE(local_id);
88
0
      return NULL;
89
0
    }
90
0
    new_context->id = local_id;
91
0
    context__add_to_by_id(new_context);
92
0
  }
93
0
  new_context->transport = mosq_t_tcp;
94
0
  new_context->bridge = bridge;
95
0
  new_context->is_bridge = true;
96
97
0
  new_context->username = bridge->remote_username;
98
0
  new_context->password = bridge->remote_password;
99
100
0
#ifdef WITH_TLS
101
0
  new_context->tls_cafile = bridge->tls_cafile;
102
0
  new_context->tls_capath = bridge->tls_capath;
103
0
  new_context->tls_certfile = bridge->tls_certfile;
104
0
  new_context->tls_keyfile = bridge->tls_keyfile;
105
0
  new_context->tls_cert_reqs = SSL_VERIFY_PEER;
106
0
  new_context->tls_ocsp_required = bridge->tls_ocsp_required;
107
0
  new_context->tls_version = bridge->tls_version;
108
0
  new_context->tls_insecure = bridge->tls_insecure;
109
0
  new_context->tls_alpn = bridge->tls_alpn;
110
0
  new_context->tls_ciphers = bridge->tls_ciphers;
111
0
  new_context->tls_13_ciphers = bridge->tls_13_ciphers;
112
0
  new_context->tls_engine = NULL;
113
0
  new_context->tls_keyform = mosq_k_pem;
114
0
  new_context->tls_use_os_certs = bridge->tls_use_os_certs;
115
0
  new_context->ssl_ctx_defaults = true;
116
0
#ifdef FINAL_WITH_TLS_PSK
117
0
  new_context->tls_psk_identity = bridge->tls_psk_identity;
118
0
  new_context->tls_psk = bridge->tls_psk;
119
0
#endif
120
0
#endif
121
122
0
  bridge->try_private_accepted = true;
123
0
  if(bridge->clean_start_local == -1){
124
    /* default to "regular" clean start setting */
125
0
    bridge->clean_start_local = bridge->clean_start;
126
0
  }
127
0
  new_context->retain_available = bridge->outgoing_retain;
128
0
  new_context->protocol = bridge->protocol_version;
129
0
  if(!bridge->clean_start_local){
130
0
    new_context->session_expiry_interval = UINT32_MAX;
131
0
    plugin_persist__handle_client_add(new_context);
132
0
    if(new_context->expiry_list_item){
133
      /* We've restored from persistence and been added to the session
134
       * expiry list, even though we should never be expired */
135
0
      session_expiry__remove(new_context);
136
0
    }
137
0
  }
138
139
0
  bridges = mosquitto_realloc(db.bridges, (size_t)(db.bridge_count+1)*sizeof(struct mosquitto *));
140
0
  if(bridges){
141
0
    db.bridges = bridges;
142
0
    db.bridge_count++;
143
0
    db.bridges[db.bridge_count-1] = new_context;
144
0
  }else{
145
0
    return NULL;
146
0
  }
147
148
0
  return new_context;
149
0
}
150
151
static void bridge__destroy(struct mosquitto *context)
152
0
{
153
0
  send__disconnect(context, MQTT_RC_SUCCESS, NULL);
154
0
  context__cleanup(context, true);
155
0
}
156
157
void bridge__start_all(void)
158
0
{
159
0
  for(int i=0; i<db.config->bridge_count; i++){
160
0
    struct mosquitto *context;
161
0
    int ret;
162
163
0
    context = bridge__new(db.config->bridges[i]);
164
0
    if(!context){
165
0
      log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
166
0
      return;
167
0
    }
168
0
    assert(context);
169
170
#if defined(__GLIBC__) && defined(WITH_ADNS)
171
    context->bridge->restart_t = 1; /* force quick restart of bridge */
172
    loop__update_next_event(1000);
173
    ret = bridge__connect_step1(context);
174
#else
175
0
    ret = bridge__connect(context);
176
0
#endif
177
178
0
    if(ret && ret != MOSQ_ERR_CONN_PENDING){
179
0
      log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to connect bridge %s.",
180
0
          context->bridge->name);
181
0
    }
182
183
0
    db.config->bridges[i] = NULL;
184
0
  }
185
0
}
186
187
static int bridge__set_tcp_keepalive(struct mosquitto *context)
188
0
{
189
0
  unsigned int idle = context->bridge->tcp_keepalive_idle;
190
0
  unsigned int interval = context->bridge->tcp_keepalive_interval;
191
0
  unsigned int counter = context->bridge->tcp_keepalive_counter;
192
0
  unsigned int enabled = 1;
193
0
  bool ret;
194
195
0
  if(idle == 0 || interval == 0 || counter == 0) return MOSQ_ERR_SUCCESS;
196
197
#ifdef WIN32
198
  ret =
199
    setsockopt(context->sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&enabled, sizeof(enabled)) ||
200
    setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPIDLE, (char *)&idle, sizeof(idle)) ||
201
    setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPINTVL, (char *)&interval, sizeof(interval)) ||
202
    setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPCNT, (char *)&counter, sizeof(counter));
203
#else
204
0
  ret =
205
0
    setsockopt(context->sock, SOL_SOCKET, SO_KEEPALIVE, (const void*)&enabled, sizeof(enabled)) ||
206
0
#ifndef __APPLE__
207
0
    setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPIDLE, (const void*)&idle, sizeof(idle)) ||
208
0
#endif
209
0
    setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPINTVL, (const void*)&interval, sizeof(interval)) ||
210
0
    setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPCNT, (const void*)&counter, sizeof(counter));
211
0
#endif
212
213
0
  if(ret) return MOSQ_ERR_UNKNOWN;
214
215
0
  return MOSQ_ERR_SUCCESS;
216
0
}
217
218
#ifdef WITH_TCP_USER_TIMEOUT
219
0
static int bridge__set_tcp_user_timeout(struct mosquitto *context) {
220
0
  int timeout = context->bridge->tcp_user_timeout;
221
0
  if(timeout >= 0) {
222
0
    if(setsockopt(context->sock, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&timeout, sizeof(timeout))) {
223
0
      return MOSQ_ERR_UNKNOWN;
224
0
    }
225
0
  }
226
227
0
  return MOSQ_ERR_SUCCESS;
228
0
}
229
#endif
230
231
#if defined(__GLIBC__) && defined(WITH_ADNS)
232
static int bridge__connect_step1(struct mosquitto *context)
233
{
234
  int rc;
235
  char *notification_topic;
236
  size_t notification_topic_len;
237
  uint8_t notification_payload;
238
  struct mosquitto__bridge_topic *cur_topic;
239
  uint8_t qos;
240
241
  if(!context || !context->bridge) return MOSQ_ERR_INVAL;
242
243
  mosquitto__set_state(context, mosq_cs_new);
244
  context->sock = INVALID_SOCKET;
245
  context->last_msg_in = db.now_s;
246
  context->next_msg_out = db.now_s + context->bridge->keepalive;
247
  context->keepalive = context->bridge->keepalive;
248
  context->clean_start = context->bridge->clean_start;
249
  context->in_packet.payload = NULL;
250
  context->ping_t = 0;
251
  context->bridge->lazy_reconnect = false;
252
  context->maximum_packet_size = context->bridge->maximum_packet_size;
253
  bridge__packet_cleanup(context);
254
  db__message_reconnect_reset(context);
255
256
  db__messages_delete(context, false);
257
258
  /* Delete all local subscriptions even for clean_start==false. We don't
259
   * remove any messages and the next loop carries out the resubscription
260
   * anyway. This means any unwanted subs will be removed.
261
   */
262
  sub__clean_session(context);
263
264
  LL_FOREACH(context->bridge->topics, cur_topic){
265
    if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){
266
      log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, cur_topic->local_topic);
267
      if(cur_topic->qos > context->max_qos){
268
        qos = context->max_qos;
269
      }else{
270
        qos = cur_topic->qos;
271
      }
272
      struct mosquitto_subscription sub;
273
      sub.topic_filter = cur_topic->local_topic;
274
      sub.identifier = 0;
275
      sub.options = MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | qos;
276
      if(sub__add(context, &sub) > 0){
277
        return 1;
278
      }
279
      retain__queue(context, &sub);
280
    }
281
  }
282
283
  if(context->bridge->notifications){
284
    if(context->max_qos == 0){
285
      qos = 0;
286
    }else{
287
      qos = 1;
288
    }
289
    if(context->bridge->notification_topic){
290
      if(!context->bridge->initial_notification_done){
291
        notification_payload = '0';
292
        db__messages_easy_queue(context, context->bridge->notification_topic, qos, 1, &notification_payload, 1, MSG_EXPIRY_INFINITE, NULL);
293
        context->bridge->initial_notification_done = true;
294
      }
295
      notification_payload = '0';
296
      rc = will__set(context, context->bridge->notification_topic, 1, &notification_payload, qos, true, NULL);
297
      if(rc != MOSQ_ERR_SUCCESS){
298
        return rc;
299
      }
300
    }else{
301
      notification_topic_len = strlen(context->bridge->remote_clientid)+strlen("$SYS/broker/connection//state");
302
      notification_topic = mosquitto_malloc(sizeof(char)*(notification_topic_len+1));
303
      if(!notification_topic) return MOSQ_ERR_NOMEM;
304
305
      snprintf(notification_topic, notification_topic_len+1, "$SYS/broker/connection/%s/state", context->bridge->remote_clientid);
306
307
      if(!context->bridge->initial_notification_done){
308
        notification_payload = '0';
309
        db__messages_easy_queue(context, notification_topic, qos, 1, &notification_payload, 1, MSG_EXPIRY_INFINITE, NULL);
310
        context->bridge->initial_notification_done = true;
311
      }
312
313
      notification_payload = '0';
314
      rc = will__set(context, notification_topic, 1, &notification_payload, qos, true, NULL);
315
      mosquitto_FREE(notification_topic);
316
      if(rc != MOSQ_ERR_SUCCESS){
317
        return rc;
318
      }
319
    }
320
  }
321
322
  log__printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge (step 1) %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port);
323
  rc = net__try_connect_step1(context, context->bridge->addresses[context->bridge->cur_address].address);
324
  if(rc > 0 ){
325
    if(rc == MOSQ_ERR_TLS){
326
      mux__delete(context);
327
      net__socket_close(context);
328
      return rc; /* Error already printed */
329
    }else if(rc == MOSQ_ERR_ERRNO){
330
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
331
    }else if(rc == MOSQ_ERR_EAI){
332
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
333
    }
334
335
    return rc;
336
  }
337
338
  return MOSQ_ERR_SUCCESS;
339
}
340
341
342
static int bridge__connect_step2(struct mosquitto *context)
343
{
344
  int rc;
345
346
  if(!context || !context->bridge) return MOSQ_ERR_INVAL;
347
348
  log__printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge (step 2) %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port);
349
  rc = net__try_connect_step2(context, context->bridge->addresses[context->bridge->cur_address].port, &context->sock);
350
  if(rc > 0){
351
    if(rc == MOSQ_ERR_TLS){
352
      mux__delete(context);
353
      net__socket_close(context);
354
      return rc; /* Error already printed */
355
    }else if(rc == MOSQ_ERR_ERRNO){
356
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
357
    }else if(rc == MOSQ_ERR_EAI){
358
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
359
    }
360
361
    return rc;
362
  }
363
364
  HASH_ADD(hh_sock, db.contexts_by_sock, sock, sizeof(context->sock), context);
365
366
  if(rc == MOSQ_ERR_CONN_PENDING){
367
    mosquitto__set_state(context, mosq_cs_connect_pending);
368
    mux__add_out(context);
369
  }
370
  return rc;
371
}
372
373
374
int bridge__connect_step3(struct mosquitto *context)
375
{
376
  int rc;
377
  mosquitto_property receive_maximum;
378
  mosquitto_property session_expiry_interval;
379
  mosquitto_property topic_alias_max;
380
  mosquitto_property *properties = NULL;
381
382
  rc = net__socket_connect_step3(context, context->bridge->addresses[context->bridge->cur_address].address);
383
  if(rc > 0){
384
    if(rc == MOSQ_ERR_TLS){
385
      mux__delete(context);
386
      net__socket_close(context);
387
      return rc; /* Error already printed */
388
    }else if(rc == MOSQ_ERR_ERRNO){
389
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
390
    }else if(rc == MOSQ_ERR_EAI){
391
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
392
    }
393
394
    return rc;
395
  }
396
397
  if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
398
    context->bridge->primary_retry = db.now_s + 5;
399
    loop__update_next_event(5000);
400
  }
401
402
  if(bridge__set_tcp_keepalive(context) != MOSQ_ERR_SUCCESS) return MOSQ_ERR_UNKNOWN;
403
#ifdef WITH_TCP_USER_TIMEOUT
404
  if(bridge__set_tcp_user_timeout(context)) return MOSQ_ERR_UNKNOWN;
405
#endif
406
407
  if(context->bridge->receive_maximum != 0){
408
    receive_maximum.value.i16 = context->bridge->receive_maximum;
409
    receive_maximum.identifier = MQTT_PROP_RECEIVE_MAXIMUM;
410
    receive_maximum.property_type = MQTT_PROP_TYPE_INT16;
411
    receive_maximum.client_generated = false;
412
    receive_maximum.next = properties;
413
    properties = &receive_maximum;
414
  }
415
  if(context->bridge->session_expiry_interval != 0){
416
    session_expiry_interval.value.i32 = context->bridge->session_expiry_interval;
417
    session_expiry_interval.identifier = MQTT_PROP_SESSION_EXPIRY_INTERVAL;
418
    session_expiry_interval.property_type = MQTT_PROP_TYPE_INT32;
419
    session_expiry_interval.client_generated = false;
420
    session_expiry_interval.next = properties;
421
    properties = &session_expiry_interval;
422
  }
423
  if(context->bridge->max_topic_alias != 0){
424
    topic_alias_max.value.i16 = context->bridge->max_topic_alias;
425
    topic_alias_max.identifier = MQTT_PROP_TOPIC_ALIAS_MAXIMUM;
426
    topic_alias_max.property_type = MQTT_PROP_TYPE_INT16;
427
    topic_alias_max.client_generated = false;
428
    topic_alias_max.next = properties;
429
    properties = &topic_alias_max;
430
  }
431
432
  rc = send__connect(context, context->keepalive, context->clean_start, properties);
433
  if(rc == MOSQ_ERR_SUCCESS){
434
    return MOSQ_ERR_SUCCESS;
435
  }else if(rc == MOSQ_ERR_ERRNO && errno == ENOTCONN){
436
    return MOSQ_ERR_SUCCESS;
437
  }else{
438
    if(rc == MOSQ_ERR_TLS){
439
      return rc; /* Error already printed */
440
    }else if(rc == MOSQ_ERR_ERRNO){
441
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
442
    }else if(rc == MOSQ_ERR_EAI){
443
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
444
    }
445
    mux__delete(context);
446
    net__socket_close(context);
447
    return rc;
448
  }
449
}
450
#else
451
452
int bridge__connect(struct mosquitto *context)
453
0
{
454
0
  int rc, rc2;
455
0
  char *notification_topic = NULL;
456
0
  size_t notification_topic_len;
457
0
  uint8_t notification_payload;
458
0
  struct mosquitto__bridge_topic *cur_topic;
459
0
  uint8_t qos;
460
461
0
  mosquitto_property receive_maximum;
462
0
  mosquitto_property session_expiry_interval;
463
0
  mosquitto_property topic_alias_max;
464
0
  mosquitto_property *properties = NULL;
465
466
0
  if(!context || !context->bridge) return MOSQ_ERR_INVAL;
467
468
0
  mosquitto__set_state(context, mosq_cs_new);
469
0
  context->sock = INVALID_SOCKET;
470
  /* coverity[missing_lock] - broker is single threaded, so no lock required */
471
0
  context->last_msg_in = db.now_s;
472
  /* coverity[missing_lock] - broker is single threaded, so no lock required */
473
0
  context->next_msg_out = db.now_s + context->bridge->keepalive;
474
0
  context->keepalive = context->bridge->keepalive;
475
0
  context->clean_start = context->bridge->clean_start;
476
0
  context->in_packet.payload = NULL;
477
0
  context->ping_t = 0;
478
0
  context->bridge->lazy_reconnect = false;
479
0
  context->maximum_packet_size = context->bridge->maximum_packet_size;
480
0
  bridge__packet_cleanup(context);
481
0
  db__message_reconnect_reset(context);
482
483
0
  db__messages_delete(context, false);
484
485
  /* Delete all local subscriptions even for clean_start==false. We don't
486
   * remove any messages and the next loop carries out the resubscription
487
   * anyway. This means any unwanted subs will be removed.
488
   */
489
0
  sub__clean_session(context);
490
491
0
  LL_FOREACH(context->bridge->topics, cur_topic){
492
0
    if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){
493
0
      log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, cur_topic->local_topic);
494
0
      if(cur_topic->qos > context->max_qos){
495
0
        qos = context->max_qos;
496
0
      }else{
497
0
        qos = cur_topic->qos;
498
0
      }
499
0
      struct mosquitto_subscription sub;
500
0
      sub.topic_filter = cur_topic->local_topic;
501
0
      sub.identifier = 0;
502
0
      sub.options = MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | qos;
503
0
      if(sub__add(context, &sub) > 0){
504
0
        return 1;
505
0
      }
506
0
    }
507
0
  }
508
509
0
  if(context->bridge->notifications){
510
0
    if(context->max_qos == 0){
511
0
      qos = 0;
512
0
    }else{
513
0
      qos = 1;
514
0
    }
515
0
    if(context->bridge->notification_topic){
516
0
      if(!context->bridge->initial_notification_done){
517
0
        notification_payload = '0';
518
0
        db__messages_easy_queue(context, context->bridge->notification_topic, qos, 1, &notification_payload, 1, MSG_EXPIRY_INFINITE, NULL);
519
0
        context->bridge->initial_notification_done = true;
520
0
      }
521
522
0
      notification_payload = '0';
523
0
      rc = will__set(context, context->bridge->notification_topic, 1, &notification_payload, qos, true, NULL);
524
0
      if(rc != MOSQ_ERR_SUCCESS){
525
0
        return rc;
526
0
      }
527
0
    }else{
528
0
      notification_topic_len = strlen(context->bridge->remote_clientid)+strlen("$SYS/broker/connection//state");
529
0
      notification_topic = mosquitto_malloc(sizeof(char)*(notification_topic_len+1));
530
0
      if(!notification_topic) return MOSQ_ERR_NOMEM;
531
532
0
      snprintf(notification_topic, notification_topic_len+1, "$SYS/broker/connection/%s/state", context->bridge->remote_clientid);
533
534
0
      if(!context->bridge->initial_notification_done){
535
0
        notification_payload = '0';
536
0
        db__messages_easy_queue(context, notification_topic, qos, 1, &notification_payload, 1, MSG_EXPIRY_INFINITE, NULL);
537
0
        context->bridge->initial_notification_done = true;
538
0
      }
539
540
0
      notification_payload = '0';
541
0
      rc = will__set(context, notification_topic, 1, &notification_payload, qos, true, NULL);
542
0
      if(rc != MOSQ_ERR_SUCCESS){
543
0
        mosquitto_FREE(notification_topic);
544
0
        return rc;
545
0
      }
546
0
      mosquitto_FREE(notification_topic);
547
0
    }
548
0
  }
549
550
0
  log__printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port);
551
0
  rc = net__socket_connect(context,
552
0
      context->bridge->addresses[context->bridge->cur_address].address,
553
0
      context->bridge->addresses[context->bridge->cur_address].port,
554
0
      context->bridge->bind_address,
555
0
      false);
556
557
0
  if(rc > 0){
558
0
    if(rc == MOSQ_ERR_TLS){
559
0
      mux__delete(context);
560
0
      net__socket_close(context);
561
0
      return rc; /* Error already printed */
562
0
    }else if(rc == MOSQ_ERR_ERRNO){
563
0
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
564
0
    }else if(rc == MOSQ_ERR_EAI){
565
0
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
566
0
    }
567
568
0
    return rc;
569
0
  }else if(rc == MOSQ_ERR_CONN_PENDING){
570
0
    mosquitto__set_state(context, mosq_cs_connect_pending);
571
0
    mux__add_out(context);
572
0
  }
573
574
0
  HASH_ADD(hh_sock, db.contexts_by_sock, sock, sizeof(context->sock), context);
575
576
0
  if (bridge__set_tcp_keepalive(context) != MOSQ_ERR_SUCCESS) return MOSQ_ERR_UNKNOWN;
577
0
#ifdef WITH_TCP_USER_TIMEOUT
578
0
  if(bridge__set_tcp_user_timeout(context)) return MOSQ_ERR_UNKNOWN;
579
0
#endif
580
581
0
  if(context->bridge->receive_maximum != 0){
582
0
    receive_maximum.value.i16 = context->bridge->receive_maximum;
583
0
    receive_maximum.identifier = MQTT_PROP_RECEIVE_MAXIMUM;
584
0
    receive_maximum.property_type = MQTT_PROP_TYPE_INT16;
585
0
    receive_maximum.client_generated = false;
586
0
    receive_maximum.next = properties;
587
0
    properties = &receive_maximum;
588
0
  }
589
0
  if(context->bridge->session_expiry_interval != 0){
590
0
    session_expiry_interval.value.i32 = context->bridge->session_expiry_interval;
591
0
    session_expiry_interval.identifier = MQTT_PROP_SESSION_EXPIRY_INTERVAL;
592
0
    session_expiry_interval.property_type = MQTT_PROP_TYPE_INT32;
593
0
    session_expiry_interval.client_generated = false;
594
0
    session_expiry_interval.next = properties;
595
0
    properties = &session_expiry_interval;
596
0
  }
597
0
  if(context->bridge->max_topic_alias != 0){
598
0
    topic_alias_max.value.i16 = context->bridge->max_topic_alias;
599
0
    topic_alias_max.identifier = MQTT_PROP_TOPIC_ALIAS_MAXIMUM;
600
0
    topic_alias_max.property_type = MQTT_PROP_TYPE_INT16;
601
0
    topic_alias_max.client_generated = false;
602
0
    topic_alias_max.next = properties;
603
0
    properties = &topic_alias_max;
604
0
  }
605
606
0
  rc2 = send__connect(context, context->keepalive, context->clean_start, properties);
607
0
  if(rc2 == MOSQ_ERR_SUCCESS){
608
0
    return rc;
609
0
  }else if(rc2 == MOSQ_ERR_ERRNO && errno == ENOTCONN){
610
0
    return MOSQ_ERR_SUCCESS;
611
0
  }else{
612
0
    if(rc2 == MOSQ_ERR_TLS){
613
0
      return rc2; /* Error already printed */
614
0
    }else if(rc2 == MOSQ_ERR_ERRNO){
615
0
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
616
0
    }else if(rc2 == MOSQ_ERR_EAI){
617
0
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
618
0
    }
619
0
    mux__delete(context);
620
0
    net__socket_close(context);
621
0
    return rc2;
622
0
  }
623
0
}
624
#endif
625
626
627
int bridge__on_connect(struct mosquitto *context)
628
0
{
629
0
  char *notification_topic;
630
0
  size_t notification_topic_len;
631
0
  struct mosquitto__bridge_topic *cur_topic;
632
0
  int sub_opts;
633
0
  bool retain = true;
634
0
  uint8_t qos;
635
636
0
  if(context->bridge->notifications){
637
0
    if(context->max_qos == 0){
638
0
      qos = 0;
639
0
    }else{
640
0
      qos = 1;
641
0
    }
642
0
    if(!context->retain_available){
643
0
      retain = false;
644
0
    }
645
0
    char notification_payload = '1';
646
0
    if(context->bridge->notification_topic){
647
0
      if(!context->bridge->notifications_local_only){
648
0
        if(send__real_publish(context, mosquitto__mid_generate(context),
649
0
            context->bridge->notification_topic, 1, &notification_payload, qos, retain, 0, 0, NULL, 0)){
650
651
0
          return 1;
652
0
        }
653
0
      }
654
0
      db__messages_easy_queue(context, context->bridge->notification_topic, qos, 1, &notification_payload, 1, MSG_EXPIRY_INFINITE, NULL);
655
0
    }else{
656
0
      notification_topic_len = strlen(context->bridge->remote_clientid)+strlen("$SYS/broker/connection//state");
657
0
      notification_topic = mosquitto_malloc(sizeof(char)*(notification_topic_len+1));
658
0
      if(!notification_topic) return MOSQ_ERR_NOMEM;
659
660
0
      snprintf(notification_topic, notification_topic_len+1, "$SYS/broker/connection/%s/state", context->bridge->remote_clientid);
661
0
      notification_payload = '1';
662
0
      if(!context->bridge->notifications_local_only){
663
0
        if(send__real_publish(context, mosquitto__mid_generate(context),
664
0
            notification_topic, 1, &notification_payload, qos, retain, 0, 0, NULL, 0)){
665
666
0
          mosquitto_FREE(notification_topic);
667
0
          return 1;
668
0
        }
669
0
      }
670
0
      db__messages_easy_queue(context, notification_topic, qos, 1, &notification_payload, 1, MSG_EXPIRY_INFINITE, NULL);
671
0
      mosquitto_FREE(notification_topic);
672
0
    }
673
0
  }
674
675
0
  LL_FOREACH(context->bridge->topics, cur_topic){
676
0
    if(cur_topic->direction == bd_in || cur_topic->direction == bd_both){
677
0
      if(cur_topic->qos > context->max_qos){
678
0
        sub_opts = context->max_qos;
679
0
      }else{
680
0
        sub_opts = cur_topic->qos;
681
0
      }
682
0
      if(context->bridge->protocol_version == mosq_p_mqtt5){
683
0
        sub_opts = sub_opts
684
0
          | MQTT_SUB_OPT_NO_LOCAL
685
0
          | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED
686
0
          | MQTT_SUB_OPT_SEND_RETAIN_ALWAYS;
687
0
      }
688
0
      if(send__subscribe(context, NULL, 1, &cur_topic->remote_topic, sub_opts, NULL)){
689
0
        return 1;
690
0
      }
691
0
    }else{
692
0
      if(context->bridge->attempt_unsubscribe){
693
0
        if(send__unsubscribe(context, NULL, 1, &cur_topic->remote_topic, NULL)){
694
          /* direction = inwards only. This means we should not be subscribed
695
          * to the topic. It is possible that we used to be subscribed to
696
          * this topic so unsubscribe. */
697
0
          return 1;
698
0
        }
699
0
      }
700
0
    }
701
0
  }
702
0
  struct mosquitto_subscription sub;
703
0
  memset(&sub, 0, sizeof(sub));
704
0
  LL_FOREACH(context->bridge->topics, cur_topic){
705
0
    sub.topic_filter = cur_topic->local_topic;
706
0
    if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){
707
0
      if(cur_topic->qos > context->max_qos){
708
0
        sub.options = context->max_qos;
709
0
      }else{
710
0
        sub.options = cur_topic->qos;
711
0
      }
712
0
      retain__queue(context, &sub);
713
0
    }
714
0
  }
715
716
0
  context->bridge->connected_at = db.now_s;
717
718
0
  return MOSQ_ERR_SUCCESS;
719
0
}
720
721
722
int bridge__register_local_connections(void)
723
0
{
724
0
  struct mosquitto *context, *ctxt_tmp = NULL;
725
726
0
  HASH_ITER(hh_sock, db.contexts_by_sock, context, ctxt_tmp){
727
0
    if(context->bridge){
728
0
      if(mux__new(context)){
729
0
        log__printf(NULL, MOSQ_LOG_ERR, "Error in initial bridge registration: %s", strerror(errno));
730
0
        return MOSQ_ERR_UNKNOWN;
731
0
      }
732
0
      mux__add_out(context);
733
0
    }
734
0
  }
735
0
  return MOSQ_ERR_SUCCESS;
736
0
}
737
738
739
void bridge__reload(void)
740
0
{
741
0
  int i;
742
0
  int j;
743
744
  // destroy old bridges that dissappeared
745
0
  for(i=0; i<db.bridge_count; i++){
746
0
    for(j=0; j<db.config->bridge_count; j++){
747
0
      if(!strcmp(db.bridges[i]->bridge->name, db.config->bridges[j]->name)) break;
748
0
    }
749
750
0
    if(j==db.config->bridge_count){
751
0
      bridge__destroy(db.bridges[i]);
752
0
    }
753
0
  }
754
755
0
  for(i=0; i<db.config->bridge_count; i++){
756
0
    for(j=0; j<db.bridge_count; j++){
757
0
      if(!strcmp(db.config->bridges[i]->name, db.bridges[j]->bridge->name)) break;
758
0
    }
759
760
0
    if(j==db.bridge_count){
761
      // a new bridge was found, create it
762
0
      bridge__new(db.config->bridges[i]);
763
0
      db.config->bridges[i] = NULL;
764
0
      continue;
765
0
    }
766
767
0
    if(db.config->bridges[i]->reload_type == brt_immediate){
768
      // in this case, an existing bridge should match
769
0
      for(j=0; j<db.bridge_count; j++){
770
0
        if(!strcmp(db.config->bridges[i]->name, db.bridges[j]->bridge->name)) break;
771
0
      }
772
773
0
      assert(j<db.bridge_count);
774
0
      db.bridges[j]->will_delay_interval = 0;
775
0
      bridge__destroy(db.bridges[j]);
776
0
      bridge__new(db.config->bridges[i]);
777
0
      db.config->bridges[i] = NULL;
778
0
    }
779
0
  }
780
0
}
781
782
void bridge__db_cleanup(void)
783
0
{
784
0
  int i;
785
786
0
  for(i=0; i<db.bridge_count; i++){
787
0
    if(db.bridges[i]){
788
0
      context__cleanup(db.bridges[i], true);
789
0
    }
790
0
  }
791
0
  mosquitto_FREE(db.bridges);
792
0
}
793
794
795
void bridge__cleanup(struct mosquitto *context)
796
0
{
797
0
  int i;
798
799
0
  assert(db.bridge_count > 0);
800
801
0
  for(i=0; i<db.bridge_count; i++){
802
0
    if(db.bridges[i] == context){
803
0
      db.bridges[i] = db.bridges[db.bridge_count-1];
804
0
      break;
805
0
    }
806
0
  }
807
808
0
  db.bridge_count--;
809
0
  if(db.bridge_count == 0){
810
0
    mosquitto_FREE(db.bridges);
811
0
  }else{
812
0
    db.bridges = mosquitto_realloc(db.bridges, (unsigned) db.bridge_count * sizeof(db.bridges[0]));
813
0
  }
814
815
0
  mosquitto_FREE(context->bridge->name);
816
0
  mosquitto_FREE(context->bridge->local_clientid);
817
0
  mosquitto_FREE(context->bridge->local_username);
818
0
  mosquitto_FREE(context->bridge->local_password);
819
0
  mosquitto_FREE(context->bridge->tls_certfile);
820
0
  mosquitto_FREE(context->bridge->tls_keyfile);
821
822
0
  if(context->bridge->remote_clientid != context->id){
823
0
    mosquitto_FREE(context->bridge->remote_clientid);
824
0
  }
825
0
  context->bridge->remote_clientid = NULL;
826
827
0
  if(context->bridge->remote_username != context->username){
828
0
    mosquitto_FREE(context->bridge->remote_username);
829
0
  }
830
0
  context->bridge->remote_username = NULL;
831
832
0
  if(context->bridge->remote_password != context->password){
833
0
    mosquitto_FREE(context->bridge->remote_password);
834
0
  }
835
0
  context->bridge->remote_password = NULL;
836
0
#ifdef WITH_TLS
837
0
  if(context->ssl_ctx){
838
0
    SSL_CTX_free(context->ssl_ctx);
839
0
    context->ssl_ctx = NULL;
840
0
  }
841
0
#endif
842
843
0
  for(i=0; i<context->bridge->address_count; i++){
844
0
    mosquitto_FREE(context->bridge->addresses[i].address);
845
0
  }
846
847
0
  mosquitto_FREE(context->bridge->addresses);
848
849
0
  config__bridge_cleanup(context->bridge);
850
0
  context->bridge = NULL;
851
0
}
852
853
854
static void bridge__packet_cleanup(struct mosquitto *context)
855
0
{
856
0
  struct mosquitto__packet *packet;
857
0
  if(!context) return;
858
859
0
  while(context->out_packet){
860
0
    packet = context->out_packet;
861
0
    context->out_packet = context->out_packet->next;
862
0
    mosquitto_FREE(packet);
863
0
  }
864
0
  context->out_packet = NULL;
865
0
  context->out_packet_last = NULL;
866
0
  metrics__int_dec(mosq_gauge_out_packets, context->out_packet_count);
867
0
  metrics__int_dec(mosq_gauge_out_packet_bytes, context->out_packet_bytes);
868
0
  context->out_packet_count = 0;
869
0
  context->out_packet_bytes = 0;
870
871
0
  packet__cleanup(&(context->in_packet));
872
0
}
873
874
static int rand_between(int low, int high)
875
0
{
876
0
  int r;
877
0
  mosquitto_getrandom(&r, sizeof(int));
878
0
  return (abs(r) % (high - low)) + low;
879
0
}
880
881
static void bridge__backoff_step(struct mosquitto__bridge *bridge)
882
0
{
883
  /*
884
    “Decorrelated Jitter” calculation, according to:
885
      https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
886
  */
887
888
0
  bridge->restart_timeout = rand_between(bridge->backoff_base, bridge->restart_timeout * 3);
889
0
  if(bridge->restart_timeout > bridge->backoff_cap){
890
0
    bridge->restart_timeout = bridge->backoff_cap;
891
0
  }
892
0
}
893
894
static void bridge__backoff_reset(struct mosquitto__bridge *bridge)
895
0
{
896
0
  bridge->restart_timeout = bridge->backoff_base;
897
0
}
898
899
static void bridge__update_backoff(struct mosquitto__bridge *bridge)
900
0
{
901
0
  if(!bridge) return;
902
0
  if(!bridge->backoff_cap) return; /* skip if not using jitter */
903
904
0
  if (bridge->connected_at && db.now_s - bridge->connected_at >= bridge->stable_connection_period) {
905
0
    log__printf(NULL, MOSQ_LOG_INFO, "Bridge %s connection was stable enough, resetting backoff", bridge->name);
906
0
    bridge__backoff_reset(bridge);
907
0
  } else {
908
0
    bridge__backoff_step(bridge);
909
0
  }
910
911
0
  bridge->connected_at = 0;
912
913
0
  log__printf(NULL, MOSQ_LOG_INFO, "Bridge %s next backoff will be %d ms", bridge->name, bridge->restart_timeout);
914
0
}
915
916
static void bridge_check_pending(struct mosquitto *context)
917
0
{
918
0
  int err;
919
0
  socklen_t len;
920
921
0
  if(context->state == mosq_cs_connect_pending){
922
0
    len = sizeof(int);
923
0
    if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
924
0
      if(err == 0){
925
0
        mosquitto__set_state(context, mosq_cs_new);
926
#if defined(WITH_ADNS) && defined(WITH_BRIDGE)
927
        if(context->bridge){
928
          bridge__connect_step3(context);
929
        }
930
#endif
931
0
      }else if(err == ECONNREFUSED){
932
0
        do_disconnect(context, MOSQ_ERR_CONN_LOST);
933
0
        return;
934
0
      }
935
0
    }else{
936
0
      do_disconnect(context, MOSQ_ERR_CONN_LOST);
937
0
      return;
938
0
    }
939
0
  }
940
0
}
941
942
static bool reload_if_needed(struct mosquitto *context)
943
0
{
944
0
  int i;
945
946
0
  for(i=0; i<db.config->bridge_count; i++){
947
0
    if(db.config->bridges[i] && !strcmp(context->bridge->name, db.config->bridges[i]->name)){
948
0
      bridge__destroy(context);
949
0
      bridge__new(db.config->bridges[i]);
950
0
      db.config->bridges[i] = NULL;
951
0
      loop__update_next_event(100);
952
0
      return true;
953
0
    }
954
0
  }
955
956
0
  return false;
957
0
}
958
959
void bridge_check(void)
960
0
{
961
0
  static time_t last_check = 0;
962
0
  struct mosquitto *context = NULL;
963
0
  socklen_t len;
964
0
  int i;
965
0
  int rc;
966
0
  int err;
967
968
0
  if(db.now_s <= last_check) return;
969
970
0
  for(i=0; i<db.bridge_count; i++){
971
0
    if(!db.bridges[i]) continue;
972
973
0
    context = db.bridges[i];
974
975
0
    if(net__is_connected(context)){
976
0
      mosquitto__check_keepalive(context);
977
0
      bridge_check_pending(context);
978
979
      /* Check for bridges that are not round robin and not currently
980
       * connected to their primary broker. */
981
0
      if(context->bridge->round_robin == false
982
0
          && context->bridge->cur_address != 0
983
0
          && context->bridge->primary_retry
984
0
          && db.now_s >= context->bridge->primary_retry){
985
986
0
        if(context->bridge->primary_retry_sock == INVALID_SOCKET){
987
0
          rc = net__try_connect(context->bridge->addresses[0].address,
988
0
              context->bridge->addresses[0].port,
989
0
              &context->bridge->primary_retry_sock,
990
0
              context->bridge->bind_address, false);
991
992
0
          if(rc == 0){
993
0
            COMPAT_CLOSE(context->bridge->primary_retry_sock);
994
0
            context->bridge->primary_retry_sock = INVALID_SOCKET;
995
0
            context->bridge->primary_retry = 0;
996
0
            mux__delete(context);
997
0
            net__socket_close(context);
998
0
            context->bridge->cur_address = 0;
999
0
          }
1000
0
        }else{
1001
0
          len = sizeof(int);
1002
0
          if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
1003
0
            if(err == 0){
1004
0
              COMPAT_CLOSE(context->bridge->primary_retry_sock);
1005
0
              context->bridge->primary_retry_sock = INVALID_SOCKET;
1006
0
              context->bridge->primary_retry = 0;
1007
0
              mux__delete(context);
1008
0
              net__socket_close(context);
1009
0
              context->bridge->cur_address = context->bridge->address_count-1;
1010
0
            }else{
1011
0
              COMPAT_CLOSE(context->bridge->primary_retry_sock);
1012
0
              context->bridge->primary_retry_sock = INVALID_SOCKET;
1013
0
              context->bridge->primary_retry = db.now_s+5;
1014
0
              loop__update_next_event(5000);
1015
0
            }
1016
0
          }else{
1017
0
            COMPAT_CLOSE(context->bridge->primary_retry_sock);
1018
0
            context->bridge->primary_retry_sock = INVALID_SOCKET;
1019
0
            context->bridge->primary_retry = db.now_s+5;
1020
0
            loop__update_next_event(5000);
1021
0
          }
1022
0
        }
1023
0
      }
1024
0
    }else{
1025
0
      if(reload_if_needed(context)) continue;
1026
1027
      /* Want to try to restart the bridge connection */
1028
0
      if(!context->bridge->restart_t){
1029
0
        bridge__update_backoff(context->bridge);
1030
0
        context->bridge->restart_t = 1000*db.now_s+context->bridge->restart_timeout;
1031
0
        context->bridge->cur_address++;
1032
0
        if(context->bridge->cur_address == context->bridge->address_count){
1033
0
          context->bridge->cur_address = 0;
1034
0
        }
1035
0
        loop__update_next_event(context->bridge->restart_timeout);
1036
0
      }else{
1037
0
        if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect)
1038
0
            || (context->bridge->start_type == bst_automatic && 1000*db.now_s >= context->bridge->restart_t)){
1039
1040
#if defined(__GLIBC__) && defined(WITH_ADNS)
1041
          if(context->adns){
1042
            /* Connection attempted, waiting on DNS lookup */
1043
            rc = gai_error(context->adns);
1044
            if(rc == EAI_INPROGRESS){
1045
              /* Just keep on waiting */
1046
            }else if(rc == 0){
1047
              rc = bridge__connect_step2(context);
1048
              if(rc == MOSQ_ERR_SUCCESS){
1049
                mux__new(context);
1050
                if(context->out_packet){
1051
                  mux__add_out(context);
1052
                }
1053
              }else if(rc == MOSQ_ERR_CONN_PENDING){
1054
                mux__new(context);
1055
                mux__add_out(context);
1056
                context->bridge->restart_t = 0;
1057
              }else{
1058
                context->bridge->cur_address++;
1059
                if(context->bridge->cur_address == context->bridge->address_count){
1060
                  context->bridge->cur_address = 0;
1061
                }
1062
                context->bridge->restart_t = 0;
1063
              }
1064
            }else{
1065
              /* Need to retry */
1066
              if(context->adns->ar_result){
1067
                freeaddrinfo(context->adns->ar_result);
1068
              }
1069
              mosquitto_FREE(context->adns);
1070
              context->bridge->restart_t = 0;
1071
            }
1072
          }else{
1073
            rc = bridge__connect_step1(context);
1074
            if(rc){
1075
              context->bridge->cur_address++;
1076
              if(context->bridge->cur_address == context->bridge->address_count){
1077
                context->bridge->cur_address = 0;
1078
              }
1079
            }else{
1080
              /* Short wait for ADNS lookup */
1081
              context->bridge->restart_t = 1;
1082
              loop__update_next_event(1000);
1083
            }
1084
          }
1085
#else
1086
0
          {
1087
0
            rc = bridge__connect(context);
1088
0
            context->bridge->restart_t = 0;
1089
0
            if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_CONN_PENDING){
1090
0
              if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
1091
0
                context->bridge->primary_retry = db.now_s + 5;
1092
0
                loop__update_next_event(5000);
1093
0
              }
1094
0
              mux__new(context);
1095
0
              if(context->out_packet){
1096
0
                mux__add_out(context);
1097
0
              }
1098
0
            }else{
1099
0
              context->bridge->cur_address++;
1100
0
              if(context->bridge->cur_address == context->bridge->address_count){
1101
0
                context->bridge->cur_address = 0;
1102
0
              }
1103
0
            }
1104
0
          }
1105
0
#endif
1106
0
        }
1107
0
      }
1108
0
    }
1109
0
  }
1110
0
}
1111
1112
#endif