Coverage Report

Created: 2023-09-19 06:58

/src/mosquitto/src/bridge.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
Copyright (c) 2009-2021 Roger Light <roger@atchoo.org>
3
4
All rights reserved. This program and the accompanying materials
5
are made available under the terms of the Eclipse Public License 2.0
6
and Eclipse Distribution License v1.0 which accompany this distribution.
7
8
The Eclipse Public License is available at
9
   https://www.eclipse.org/legal/epl-2.0/
10
and the Eclipse Distribution License is available at
11
  http://www.eclipse.org/org/documents/edl-v10.php.
12
13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15
Contributors:
16
   Roger Light - initial implementation and documentation.
17
*/
18
19
#include "config.h"
20
21
#include <assert.h>
22
#include <errno.h>
23
#include <stdio.h>
24
#include <string.h>
25
26
#ifndef WIN32
27
#include <netdb.h>
28
#include <sys/socket.h>
29
#include <netinet/in.h>
30
#include <netinet/tcp.h>
31
#else
32
#include <winsock2.h>
33
#include <ws2tcpip.h>
34
#endif
35
36
#ifndef WIN32
37
#include <unistd.h>
38
#else
39
#include <process.h>
40
#include <winsock2.h>
41
#include <ws2tcpip.h>
42
#endif
43
44
#include "mqtt_protocol.h"
45
#include "mosquitto.h"
46
#include "mosquitto_broker_internal.h"
47
#include "mosquitto_internal.h"
48
#include "net_mosq.h"
49
#include "memory_mosq.h"
50
#include "packet_mosq.h"
51
#include "send_mosq.h"
52
#include "sys_tree.h"
53
#include "time_mosq.h"
54
#include "tls_mosq.h"
55
#include "util_mosq.h"
56
#include "will_mosq.h"
57
#include "utlist.h"
58
59
#ifdef WITH_BRIDGE
60
61
static void bridge__update_backoff(struct mosquitto__bridge *bridge);
62
#if defined(__GLIBC__) && defined(WITH_ADNS)
63
static int bridge__connect_step1(struct mosquitto *context);
64
static int bridge__connect_step2(struct mosquitto *context);
65
#endif
66
static void bridge__packet_cleanup(struct mosquitto *context);
67
68
static struct mosquitto *bridge__new(struct mosquitto__bridge *bridge)
69
0
{
70
0
  struct mosquitto *new_context = NULL;
71
0
  struct mosquitto **bridges;
72
0
  char *local_id;
73
74
0
  assert(bridge);
75
76
0
  local_id = mosquitto__strdup(bridge->local_clientid);
77
0
  if(!local_id){
78
0
    return NULL;
79
0
  }
80
81
0
  HASH_FIND(hh_id, db.contexts_by_id, local_id, strlen(local_id), new_context);
82
0
  if(new_context){
83
    /* (possible from persistent db) */
84
0
    mosquitto__FREE(local_id);
85
0
  }else{
86
    /* id wasn't found, so generate a new context */
87
0
    new_context = context__init();
88
0
    if(!new_context){
89
0
      mosquitto__FREE(local_id);
90
0
      return NULL;
91
0
    }
92
0
    new_context->id = local_id;
93
0
    context__add_to_by_id(new_context);
94
0
  }
95
0
  new_context->transport = mosq_t_tcp;
96
0
  new_context->bridge = bridge;
97
0
  new_context->is_bridge = true;
98
99
0
  new_context->username = bridge->remote_username;
100
0
  new_context->password = bridge->remote_password;
101
102
0
#ifdef WITH_TLS
103
0
  new_context->tls_cafile = bridge->tls_cafile;
104
0
  new_context->tls_capath = bridge->tls_capath;
105
0
  new_context->tls_certfile = bridge->tls_certfile;
106
0
  new_context->tls_keyfile = bridge->tls_keyfile;
107
0
  new_context->tls_cert_reqs = SSL_VERIFY_PEER;
108
0
  new_context->tls_ocsp_required = bridge->tls_ocsp_required;
109
0
  new_context->tls_version = bridge->tls_version;
110
0
  new_context->tls_insecure = bridge->tls_insecure;
111
0
  new_context->tls_alpn = bridge->tls_alpn;
112
0
  new_context->tls_ciphers = bridge->tls_ciphers;
113
0
  new_context->tls_13_ciphers = bridge->tls_13_ciphers;
114
0
  new_context->tls_engine = NULL;
115
0
  new_context->tls_keyform = mosq_k_pem;
116
0
  new_context->tls_use_os_certs = bridge->tls_use_os_certs;
117
0
  new_context->ssl_ctx_defaults = true;
118
0
#ifdef FINAL_WITH_TLS_PSK
119
0
  new_context->tls_psk_identity = bridge->tls_psk_identity;
120
0
  new_context->tls_psk = bridge->tls_psk;
121
0
#endif
122
0
#endif
123
124
0
  bridge->try_private_accepted = true;
125
0
  if(bridge->clean_start_local == -1){
126
    /* default to "regular" clean start setting */
127
0
    bridge->clean_start_local = bridge->clean_start;
128
0
  }
129
0
  new_context->retain_available = bridge->outgoing_retain;
130
0
  new_context->protocol = bridge->protocol_version;
131
0
  if(!bridge->clean_start_local){
132
0
    new_context->session_expiry_interval = UINT32_MAX;
133
0
  }
134
135
0
  bridges = mosquitto__realloc(db.bridges, (size_t)(db.bridge_count+1)*sizeof(struct mosquitto *));
136
0
  if(bridges){
137
0
    db.bridges = bridges;
138
0
    db.bridge_count++;
139
0
    db.bridges[db.bridge_count-1] = new_context;
140
0
  }else{
141
0
    return NULL;
142
0
  }
143
144
0
  return new_context;
145
0
}
146
147
static void bridge__destroy(struct mosquitto *context)
148
0
{
149
0
  send__disconnect(context, MQTT_RC_SUCCESS, NULL);
150
0
  context__cleanup(context, true);
151
0
}
152
153
void bridge__start_all(void)
154
0
{
155
0
  int i;
156
157
0
  for(i=0; i<db.config->bridge_count; i++){
158
0
    struct mosquitto *context;
159
0
    int ret;
160
161
0
    context = bridge__new(db.config->bridges[i]);
162
0
    if(!context){
163
0
      log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
164
0
      return;
165
0
    }
166
0
    assert(context);
167
168
#if defined(__GLIBC__) && defined(WITH_ADNS)
169
    context->bridge->restart_t = 1; /* force quick restart of bridge */
170
    loop__update_next_event(1000);
171
    ret = bridge__connect_step1(context);
172
#else
173
0
    ret = bridge__connect(context);
174
0
#endif
175
176
0
    if (ret && ret != MOSQ_ERR_CONN_PENDING){
177
0
      log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to connect bridge %s.",
178
0
          context->bridge->name);
179
0
    }
180
181
0
    db.config->bridges[i] = NULL;
182
0
  }
183
0
}
184
185
static int bridge__set_tcp_keepalive(struct mosquitto *context)
186
0
{
187
0
  unsigned int idle = context->bridge->tcp_keepalive_idle;
188
0
  unsigned int interval = context->bridge->tcp_keepalive_interval;
189
0
  unsigned int counter = context->bridge->tcp_keepalive_counter;
190
0
  unsigned int enabled = 1;
191
0
  bool ret;
192
193
0
  if(idle == 0 || interval == 0 || counter == 0) return MOSQ_ERR_SUCCESS;
194
195
#ifdef WIN32
196
  ret =
197
    setsockopt(context->sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&enabled, sizeof(enabled)) ||
198
    setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPIDLE, (char *)&idle, sizeof(idle)) ||
199
    setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPINTVL, (char *)&interval, sizeof(interval)) ||
200
    setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPCNT, (char *)&counter, sizeof(counter));
201
#else
202
0
  ret =
203
0
    setsockopt(context->sock, SOL_SOCKET, SO_KEEPALIVE, (const void*)&enabled, sizeof(enabled)) ||
204
0
#ifndef __APPLE__
205
0
    setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPIDLE, (const void*)&idle, sizeof(idle)) ||
206
0
#endif
207
0
    setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPINTVL, (const void*)&interval, sizeof(interval)) ||
208
0
    setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPCNT, (const void*)&counter, sizeof(counter));
209
0
#endif
210
211
0
  if(ret) return MOSQ_ERR_UNKNOWN;
212
213
0
  return MOSQ_ERR_SUCCESS;
214
0
}
215
216
#ifdef WITH_TCP_USER_TIMEOUT
217
0
static int bridge__set_tcp_user_timeout(struct mosquitto *context) {
218
0
  int timeout = context->bridge->tcp_user_timeout;
219
0
  if(timeout >= 0) {
220
0
    if(setsockopt(context->sock, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&timeout, sizeof(timeout))) {
221
0
      return MOSQ_ERR_UNKNOWN;
222
0
    }
223
0
  }
224
225
0
  return MOSQ_ERR_SUCCESS;
226
0
}
227
#endif
228
229
#if defined(__GLIBC__) && defined(WITH_ADNS)
230
static int bridge__connect_step1(struct mosquitto *context)
231
{
232
  int rc;
233
  char *notification_topic;
234
  size_t notification_topic_len;
235
  uint8_t notification_payload;
236
  struct mosquitto__bridge_topic *cur_topic;
237
  uint8_t qos;
238
239
  if(!context || !context->bridge) return MOSQ_ERR_INVAL;
240
241
  mosquitto__set_state(context, mosq_cs_new);
242
  context->sock = INVALID_SOCKET;
243
  context->last_msg_in = db.now_s;
244
  context->next_msg_out = db.now_s + context->bridge->keepalive;
245
  context->keepalive = context->bridge->keepalive;
246
  context->clean_start = context->bridge->clean_start;
247
  context->in_packet.payload = NULL;
248
  context->ping_t = 0;
249
  context->bridge->lazy_reconnect = false;
250
  context->maximum_packet_size = context->bridge->maximum_packet_size;
251
  bridge__packet_cleanup(context);
252
  db__message_reconnect_reset(context);
253
254
  db__messages_delete(context, false);
255
256
  /* Delete all local subscriptions even for clean_start==false. We don't
257
   * remove any messages and the next loop carries out the resubscription
258
   * anyway. This means any unwanted subs will be removed.
259
   */
260
  sub__clean_session(context);
261
262
  LL_FOREACH(context->bridge->topics, cur_topic){
263
    if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){
264
      log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, cur_topic->local_topic);
265
      if(cur_topic->qos > context->max_qos){
266
        qos = context->max_qos;
267
      }else{
268
        qos = cur_topic->qos;
269
      }
270
      struct mosquitto_subscription sub;
271
      sub.topic_filter = cur_topic->local_topic;
272
      sub.identifier = 0;
273
      sub.options = MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | qos;
274
      if(sub__add(context, &sub) > 0){
275
        return 1;
276
      }
277
      retain__queue(context, &sub);
278
    }
279
  }
280
281
  if(context->bridge->notifications){
282
    if(context->max_qos == 0){
283
      qos = 0;
284
    }else{
285
      qos = 1;
286
    }
287
    if(context->bridge->notification_topic){
288
      if(!context->bridge->initial_notification_done){
289
        notification_payload = '0';
290
        db__messages_easy_queue(context, context->bridge->notification_topic, qos, 1, &notification_payload, 1, 0, NULL);
291
        context->bridge->initial_notification_done = true;
292
      }
293
      notification_payload = '0';
294
      rc = will__set(context, context->bridge->notification_topic, 1, &notification_payload, qos, true, NULL);
295
      if(rc != MOSQ_ERR_SUCCESS){
296
        return rc;
297
      }
298
    }else{
299
      notification_topic_len = strlen(context->bridge->remote_clientid)+strlen("$SYS/broker/connection//state");
300
      notification_topic = mosquitto__malloc(sizeof(char)*(notification_topic_len+1));
301
      if(!notification_topic) return MOSQ_ERR_NOMEM;
302
303
      snprintf(notification_topic, notification_topic_len+1, "$SYS/broker/connection/%s/state", context->bridge->remote_clientid);
304
305
      if(!context->bridge->initial_notification_done){
306
        notification_payload = '0';
307
        db__messages_easy_queue(context, notification_topic, qos, 1, &notification_payload, 1, 0, NULL);
308
        context->bridge->initial_notification_done = true;
309
      }
310
311
      notification_payload = '0';
312
      rc = will__set(context, notification_topic, 1, &notification_payload, qos, true, NULL);
313
      mosquitto__FREE(notification_topic);
314
      if(rc != MOSQ_ERR_SUCCESS){
315
        return rc;
316
      }
317
    }
318
  }
319
320
  log__printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge (step 1) %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port);
321
  rc = net__try_connect_step1(context, context->bridge->addresses[context->bridge->cur_address].address);
322
  if(rc > 0 ){
323
    if(rc == MOSQ_ERR_TLS){
324
      mux__delete(context);
325
      net__socket_close(context);
326
      return rc; /* Error already printed */
327
    }else if(rc == MOSQ_ERR_ERRNO){
328
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
329
    }else if(rc == MOSQ_ERR_EAI){
330
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
331
    }
332
333
    return rc;
334
  }
335
336
  return MOSQ_ERR_SUCCESS;
337
}
338
339
340
static int bridge__connect_step2(struct mosquitto *context)
341
{
342
  int rc;
343
344
  if(!context || !context->bridge) return MOSQ_ERR_INVAL;
345
346
  log__printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge (step 2) %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port);
347
  rc = net__try_connect_step2(context, context->bridge->addresses[context->bridge->cur_address].port, &context->sock);
348
  if(rc > 0){
349
    if(rc == MOSQ_ERR_TLS){
350
      mux__delete(context);
351
      net__socket_close(context);
352
      return rc; /* Error already printed */
353
    }else if(rc == MOSQ_ERR_ERRNO){
354
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
355
    }else if(rc == MOSQ_ERR_EAI){
356
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
357
    }
358
359
    return rc;
360
  }
361
362
  HASH_ADD(hh_sock, db.contexts_by_sock, sock, sizeof(context->sock), context);
363
364
  if(rc == MOSQ_ERR_CONN_PENDING){
365
    mosquitto__set_state(context, mosq_cs_connect_pending);
366
    mux__add_out(context);
367
  }
368
  return rc;
369
}
370
371
372
int bridge__connect_step3(struct mosquitto *context)
373
{
374
  int rc;
375
  mosquitto_property receive_maximum;
376
  mosquitto_property session_expiry_interval;
377
  mosquitto_property topic_alias_max;
378
  mosquitto_property *properties = NULL;
379
380
  rc = net__socket_connect_step3(context, context->bridge->addresses[context->bridge->cur_address].address);
381
  if(rc > 0){
382
    if(rc == MOSQ_ERR_TLS){
383
      mux__delete(context);
384
      net__socket_close(context);
385
      return rc; /* Error already printed */
386
    }else if(rc == MOSQ_ERR_ERRNO){
387
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
388
    }else if(rc == MOSQ_ERR_EAI){
389
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
390
    }
391
392
    return rc;
393
  }
394
395
  if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
396
    context->bridge->primary_retry = db.now_s + 5;
397
    loop__update_next_event(5000);
398
  }
399
400
  if (bridge__set_tcp_keepalive(context) != MOSQ_ERR_SUCCESS) return MOSQ_ERR_UNKNOWN;
401
#ifdef WITH_TCP_USER_TIMEOUT
402
  if(bridge__set_tcp_user_timeout(context)) return MOSQ_ERR_UNKNOWN;
403
#endif
404
405
  if(context->bridge->receive_maximum != 0){
406
    receive_maximum.value.i16 = context->bridge->receive_maximum;
407
    receive_maximum.identifier = MQTT_PROP_RECEIVE_MAXIMUM;
408
    receive_maximum.property_type = MQTT_PROP_TYPE_INT16;
409
    receive_maximum.client_generated = false;
410
    receive_maximum.next = properties;
411
    properties = &receive_maximum;
412
  }
413
  if(context->bridge->session_expiry_interval != 0){
414
    session_expiry_interval.value.i32 = context->bridge->session_expiry_interval;
415
    session_expiry_interval.identifier = MQTT_PROP_SESSION_EXPIRY_INTERVAL;
416
    session_expiry_interval.property_type = MQTT_PROP_TYPE_INT32;
417
    session_expiry_interval.client_generated = false;
418
    session_expiry_interval.next = properties;
419
    properties = &session_expiry_interval;
420
  }
421
  if(context->bridge->max_topic_alias != 0){
422
    topic_alias_max.value.i16 = context->bridge->max_topic_alias;
423
    topic_alias_max.identifier = MQTT_PROP_TOPIC_ALIAS_MAXIMUM;
424
    topic_alias_max.property_type = MQTT_PROP_TYPE_INT16;
425
    topic_alias_max.client_generated = false;
426
    topic_alias_max.next = properties;
427
    properties = &topic_alias_max;
428
  }
429
430
  rc = send__connect(context, context->keepalive, context->clean_start, properties);
431
  if(rc == MOSQ_ERR_SUCCESS){
432
    return MOSQ_ERR_SUCCESS;
433
  }else if(rc == MOSQ_ERR_ERRNO && errno == ENOTCONN){
434
    return MOSQ_ERR_SUCCESS;
435
  }else{
436
    if(rc == MOSQ_ERR_TLS){
437
      return rc; /* Error already printed */
438
    }else if(rc == MOSQ_ERR_ERRNO){
439
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
440
    }else if(rc == MOSQ_ERR_EAI){
441
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
442
    }
443
    mux__delete(context);
444
    net__socket_close(context);
445
    return rc;
446
  }
447
}
448
#else
449
450
int bridge__connect(struct mosquitto *context)
451
0
{
452
0
  int rc, rc2;
453
0
  char *notification_topic = NULL;
454
0
  size_t notification_topic_len;
455
0
  uint8_t notification_payload;
456
0
  struct mosquitto__bridge_topic *cur_topic;
457
0
  uint8_t qos;
458
0
  mosquitto_property receive_maximum;
459
0
  mosquitto_property session_expiry_interval;
460
0
  mosquitto_property topic_alias_max;
461
0
  mosquitto_property *properties = NULL;
462
463
0
  if(!context || !context->bridge) return MOSQ_ERR_INVAL;
464
465
0
  mosquitto__set_state(context, mosq_cs_new);
466
0
  context->sock = INVALID_SOCKET;
467
  /* coverity[missing_lock] - broker is single threaded, so no lock required */
468
0
  context->last_msg_in = db.now_s;
469
  /* coverity[missing_lock] - broker is single threaded, so no lock required */
470
0
  context->next_msg_out = db.now_s + context->bridge->keepalive;
471
0
  context->keepalive = context->bridge->keepalive;
472
0
  context->clean_start = context->bridge->clean_start;
473
0
  context->in_packet.payload = NULL;
474
0
  context->ping_t = 0;
475
0
  context->bridge->lazy_reconnect = false;
476
0
  context->maximum_packet_size = context->bridge->maximum_packet_size;
477
0
  bridge__packet_cleanup(context);
478
0
  db__message_reconnect_reset(context);
479
480
0
  db__messages_delete(context, false);
481
482
  /* Delete all local subscriptions even for clean_start==false. We don't
483
   * remove any messages and the next loop carries out the resubscription
484
   * anyway. This means any unwanted subs will be removed.
485
   */
486
0
  sub__clean_session(context);
487
488
0
  LL_FOREACH(context->bridge->topics, cur_topic){
489
0
    if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){
490
0
      log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, cur_topic->local_topic);
491
0
      if(cur_topic->qos > context->max_qos){
492
0
        qos = context->max_qos;
493
0
      }else{
494
0
        qos = cur_topic->qos;
495
0
      }
496
0
      struct mosquitto_subscription sub;
497
0
      sub.topic_filter = cur_topic->local_topic;
498
0
      sub.identifier = 0;
499
0
      sub.options = MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | qos;
500
0
      if(sub__add(context, &sub) > 0){
501
0
        return 1;
502
0
      }
503
0
    }
504
0
  }
505
506
0
  if(context->bridge->notifications){
507
0
    if(context->max_qos == 0){
508
0
      qos = 0;
509
0
    }else{
510
0
      qos = 1;
511
0
    }
512
0
    if(context->bridge->notification_topic){
513
0
      if(!context->bridge->initial_notification_done){
514
0
        notification_payload = '0';
515
0
        db__messages_easy_queue(context, context->bridge->notification_topic, qos, 1, &notification_payload, 1, 0, NULL);
516
0
        context->bridge->initial_notification_done = true;
517
0
      }
518
519
0
      notification_payload = '0';
520
0
      rc = will__set(context, context->bridge->notification_topic, 1, &notification_payload, qos, true, NULL);
521
0
      if(rc != MOSQ_ERR_SUCCESS){
522
0
        return rc;
523
0
      }
524
0
    }else{
525
0
      notification_topic_len = strlen(context->bridge->remote_clientid)+strlen("$SYS/broker/connection//state");
526
0
      notification_topic = mosquitto__malloc(sizeof(char)*(notification_topic_len+1));
527
0
      if(!notification_topic) return MOSQ_ERR_NOMEM;
528
529
0
      snprintf(notification_topic, notification_topic_len+1, "$SYS/broker/connection/%s/state", context->bridge->remote_clientid);
530
531
0
      if(!context->bridge->initial_notification_done){
532
0
        notification_payload = '0';
533
0
        db__messages_easy_queue(context, notification_topic, qos, 1, &notification_payload, 1, 0, NULL);
534
0
        context->bridge->initial_notification_done = true;
535
0
      }
536
537
0
      notification_payload = '0';
538
0
      rc = will__set(context, notification_topic, 1, &notification_payload, qos, true, NULL);
539
0
      if(rc != MOSQ_ERR_SUCCESS){
540
0
        mosquitto__FREE(notification_topic);
541
0
        return rc;
542
0
      }
543
0
      mosquitto__FREE(notification_topic);
544
0
    }
545
0
  }
546
547
0
  log__printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port);
548
0
  rc = net__socket_connect(context,
549
0
      context->bridge->addresses[context->bridge->cur_address].address,
550
0
      context->bridge->addresses[context->bridge->cur_address].port,
551
0
      context->bridge->bind_address,
552
0
      false);
553
554
0
  if(rc > 0){
555
0
    if(rc == MOSQ_ERR_TLS){
556
0
      mux__delete(context);
557
0
      net__socket_close(context);
558
0
      return rc; /* Error already printed */
559
0
    }else if(rc == MOSQ_ERR_ERRNO){
560
0
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
561
0
    }else if(rc == MOSQ_ERR_EAI){
562
0
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
563
0
    }
564
565
0
    return rc;
566
0
  }else if(rc == MOSQ_ERR_CONN_PENDING){
567
0
    mosquitto__set_state(context, mosq_cs_connect_pending);
568
0
    mux__add_out(context);
569
0
  }
570
571
0
  HASH_ADD(hh_sock, db.contexts_by_sock, sock, sizeof(context->sock), context);
572
573
0
  if (bridge__set_tcp_keepalive(context) != MOSQ_ERR_SUCCESS) return MOSQ_ERR_UNKNOWN;
574
0
#ifdef WITH_TCP_USER_TIMEOUT
575
0
  if(bridge__set_tcp_user_timeout(context)) return MOSQ_ERR_UNKNOWN;
576
0
#endif
577
578
0
  if(context->bridge->receive_maximum != 0){
579
0
    receive_maximum.value.i16 = context->bridge->receive_maximum;
580
0
    receive_maximum.identifier = MQTT_PROP_RECEIVE_MAXIMUM;
581
0
    receive_maximum.property_type = MQTT_PROP_TYPE_INT16;
582
0
    receive_maximum.client_generated = false;
583
0
    receive_maximum.next = properties;
584
0
    properties = &receive_maximum;
585
0
  }
586
0
  if(context->bridge->session_expiry_interval != 0){
587
0
    session_expiry_interval.value.i32 = context->bridge->session_expiry_interval;
588
0
    session_expiry_interval.identifier = MQTT_PROP_SESSION_EXPIRY_INTERVAL;
589
0
    session_expiry_interval.property_type = MQTT_PROP_TYPE_INT32;
590
0
    session_expiry_interval.client_generated = false;
591
0
    session_expiry_interval.next = properties;
592
0
    properties = &session_expiry_interval;
593
0
  }
594
0
  if(context->bridge->max_topic_alias != 0){
595
0
    topic_alias_max.value.i16 = context->bridge->max_topic_alias;
596
0
    topic_alias_max.identifier = MQTT_PROP_TOPIC_ALIAS_MAXIMUM;
597
0
    topic_alias_max.property_type = MQTT_PROP_TYPE_INT16;
598
0
    topic_alias_max.client_generated = false;
599
0
    topic_alias_max.next = properties;
600
0
    properties = &topic_alias_max;
601
0
  }
602
603
0
  rc2 = send__connect(context, context->keepalive, context->clean_start, properties);
604
0
  if(rc2 == MOSQ_ERR_SUCCESS){
605
0
    return rc;
606
0
  }else if(rc2 == MOSQ_ERR_ERRNO && errno == ENOTCONN){
607
0
    return MOSQ_ERR_SUCCESS;
608
0
  }else{
609
0
    if(rc2 == MOSQ_ERR_TLS){
610
0
      return rc2; /* Error already printed */
611
0
    }else if(rc2 == MOSQ_ERR_ERRNO){
612
0
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
613
0
    }else if(rc2 == MOSQ_ERR_EAI){
614
0
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
615
0
    }
616
0
    mux__delete(context);
617
0
    net__socket_close(context);
618
0
    return rc2;
619
0
  }
620
0
}
621
#endif
622
623
624
int bridge__on_connect(struct mosquitto *context)
625
0
{
626
0
  char *notification_topic;
627
0
  size_t notification_topic_len;
628
0
  char notification_payload;
629
0
  struct mosquitto__bridge_topic *cur_topic;
630
0
  int sub_opts;
631
0
  bool retain = true;
632
0
  uint8_t qos;
633
634
0
  if(context->bridge->notifications){
635
0
    if(context->max_qos == 0){
636
0
      qos = 0;
637
0
    }else{
638
0
      qos = 1;
639
0
    }
640
0
    if(!context->retain_available){
641
0
      retain = false;
642
0
    }
643
0
    notification_payload = '1';
644
0
    if(context->bridge->notification_topic){
645
0
      if(!context->bridge->notifications_local_only){
646
0
        if(send__real_publish(context, mosquitto__mid_generate(context),
647
0
            context->bridge->notification_topic, 1, &notification_payload, qos, retain, 0, 0, NULL, 0)){
648
649
0
          return 1;
650
0
        }
651
0
      }
652
0
      db__messages_easy_queue(context, context->bridge->notification_topic, qos, 1, &notification_payload, 1, 0, NULL);
653
0
    }else{
654
0
      notification_topic_len = strlen(context->bridge->remote_clientid)+strlen("$SYS/broker/connection//state");
655
0
      notification_topic = mosquitto__malloc(sizeof(char)*(notification_topic_len+1));
656
0
      if(!notification_topic) return MOSQ_ERR_NOMEM;
657
658
0
      snprintf(notification_topic, notification_topic_len+1, "$SYS/broker/connection/%s/state", context->bridge->remote_clientid);
659
0
      notification_payload = '1';
660
0
      if(!context->bridge->notifications_local_only){
661
0
        if(send__real_publish(context, mosquitto__mid_generate(context),
662
0
            notification_topic, 1, &notification_payload, qos, retain, 0, 0, NULL, 0)){
663
664
0
          mosquitto__FREE(notification_topic);
665
0
          return 1;
666
0
        }
667
0
      }
668
0
      db__messages_easy_queue(context, notification_topic, qos, 1, &notification_payload, 1, 0, NULL);
669
0
      mosquitto__FREE(notification_topic);
670
0
    }
671
0
  }
672
673
0
  LL_FOREACH(context->bridge->topics, cur_topic){
674
0
    if(cur_topic->direction == bd_in || cur_topic->direction == bd_both){
675
0
      if(cur_topic->qos > context->max_qos){
676
0
        sub_opts = context->max_qos;
677
0
      }else{
678
0
        sub_opts = cur_topic->qos;
679
0
      }
680
0
      if(context->bridge->protocol_version == mosq_p_mqtt5){
681
0
        sub_opts = sub_opts
682
0
          | MQTT_SUB_OPT_NO_LOCAL
683
0
          | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED
684
0
          | MQTT_SUB_OPT_SEND_RETAIN_ALWAYS;
685
0
      }
686
0
      if(send__subscribe(context, NULL, 1, &cur_topic->remote_topic, sub_opts, NULL)){
687
0
        return 1;
688
0
      }
689
0
    }else{
690
0
      if(context->bridge->attempt_unsubscribe){
691
0
        if(send__unsubscribe(context, NULL, 1, &cur_topic->remote_topic, NULL)){
692
          /* direction = inwards only. This means we should not be subscribed
693
          * to the topic. It is possible that we used to be subscribed to
694
          * this topic so unsubscribe. */
695
0
          return 1;
696
0
        }
697
0
      }
698
0
    }
699
0
  }
700
0
  struct mosquitto_subscription sub;
701
0
  memset(&sub, 0, sizeof(sub));
702
0
  LL_FOREACH(context->bridge->topics, cur_topic){
703
0
    sub.topic_filter = cur_topic->local_topic;
704
0
    if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){
705
0
      if(cur_topic->qos > context->max_qos){
706
0
        sub.options = context->max_qos;
707
0
      }else{
708
0
        sub.options = cur_topic->qos;
709
0
      }
710
0
      retain__queue(context, &sub);
711
0
    }
712
0
  }
713
714
0
  context->bridge->connected_at = db.now_s;
715
716
0
  return MOSQ_ERR_SUCCESS;
717
0
}
718
719
720
int bridge__register_local_connections(void)
721
0
{
722
0
  struct mosquitto *context, *ctxt_tmp = NULL;
723
724
0
  HASH_ITER(hh_sock, db.contexts_by_sock, context, ctxt_tmp){
725
0
    if(context->bridge){
726
0
      if(mux__new(context)){
727
0
        log__printf(NULL, MOSQ_LOG_ERR, "Error in initial bridge registration: %s", strerror(errno));
728
0
        return MOSQ_ERR_UNKNOWN;
729
0
      }
730
0
      mux__add_out(context);
731
0
    }
732
0
  }
733
0
  return MOSQ_ERR_SUCCESS;
734
0
}
735
736
737
void bridge__reload(void)
738
0
{
739
0
  int i;
740
0
  int j;
741
742
  // destroy old bridges that dissappeared
743
0
  for(i=0;i<db.bridge_count;i++){
744
0
    for(j=0;j<db.config->bridge_count;j++){
745
0
      if(!strcmp(db.bridges[i]->bridge->name, db.config->bridges[j]->name)) break;
746
0
    }
747
748
0
    if(j==db.config->bridge_count){
749
0
      bridge__destroy(db.bridges[i]);
750
0
    }
751
0
  }
752
753
0
  for(i=0;i<db.config->bridge_count;i++){
754
0
    for(j=0;j<db.bridge_count; j++){
755
0
      if(!strcmp(db.config->bridges[i]->name, db.bridges[j]->bridge->name)) break;
756
0
    }
757
758
0
    if(j==db.bridge_count){
759
      // a new bridge was found, create it
760
0
      bridge__new(db.config->bridges[i]);
761
0
      db.config->bridges[i] = NULL;
762
0
      continue;
763
0
    }
764
765
0
    if(db.config->bridges[i]->reload_type == brt_immediate){
766
      // in this case, an existing bridge should match
767
0
      for(j=0;j<db.bridge_count;j++){
768
0
        if(!strcmp(db.config->bridges[i]->name, db.bridges[j]->bridge->name)) break;
769
0
      }
770
771
0
      assert(j<db.bridge_count);
772
0
      db.bridges[j]->will_delay_interval = 0;
773
0
      bridge__destroy(db.bridges[j]);
774
0
      bridge__new(db.config->bridges[i]);
775
0
      db.config->bridges[i] = NULL;
776
0
    }
777
0
  }
778
0
}
779
780
void bridge__db_cleanup(void)
781
0
{
782
0
  int i;
783
784
0
  for(i=0; i<db.bridge_count; i++){
785
0
    if(db.bridges[i]){
786
0
      context__cleanup(db.bridges[i], true);
787
0
    }
788
0
  }
789
0
  mosquitto__FREE(db.bridges);
790
0
}
791
792
793
void bridge__cleanup(struct mosquitto *context)
794
0
{
795
0
  int i;
796
797
0
  assert(db.bridge_count > 0);
798
799
0
  for(i=0; i<db.bridge_count; i++){
800
0
    if(db.bridges[i] == context){
801
0
      db.bridges[i] = db.bridges[db.bridge_count-1];
802
0
      break;
803
0
    }
804
0
  }
805
806
0
  db.bridge_count--;
807
0
  db.bridges = mosquitto__realloc(db.bridges, (unsigned) db.bridge_count * sizeof(db.bridges[0]));
808
809
0
  mosquitto__FREE(context->bridge->name);
810
0
  mosquitto__FREE(context->bridge->local_clientid);
811
0
  mosquitto__FREE(context->bridge->local_username);
812
0
  mosquitto__FREE(context->bridge->local_password);
813
814
0
  if(context->bridge->remote_clientid != context->id){
815
0
    mosquitto__FREE(context->bridge->remote_clientid);
816
0
  }
817
0
  context->bridge->remote_clientid = NULL;
818
819
0
  if(context->bridge->remote_username != context->username){
820
0
    mosquitto__FREE(context->bridge->remote_username);
821
0
  }
822
0
  context->bridge->remote_username = NULL;
823
824
0
  if(context->bridge->remote_password != context->password){
825
0
    mosquitto__FREE(context->bridge->remote_password);
826
0
  }
827
0
  context->bridge->remote_password = NULL;
828
0
#ifdef WITH_TLS
829
0
  if(context->ssl_ctx){
830
0
    SSL_CTX_free(context->ssl_ctx);
831
0
    context->ssl_ctx = NULL;
832
0
  }
833
0
#endif
834
835
0
  for(i=0; i<context->bridge->address_count; i++){
836
0
    mosquitto__FREE(context->bridge->addresses[i].address);
837
0
  }
838
839
0
  mosquitto__FREE(context->bridge->addresses);
840
841
0
  config__bridge_cleanup(context->bridge);
842
0
  context->bridge = NULL;
843
0
}
844
845
846
static void bridge__packet_cleanup(struct mosquitto *context)
847
0
{
848
0
  struct mosquitto__packet *packet;
849
0
  if(!context) return;
850
851
0
    while(context->out_packet){
852
0
    packet = context->out_packet;
853
0
    context->out_packet = context->out_packet->next;
854
0
    mosquitto__FREE(packet);
855
0
  }
856
0
  context->out_packet = NULL;
857
0
  context->out_packet_last = NULL;
858
0
  metrics__int_dec(mosq_gauge_out_packets, context->out_packet_count);
859
0
  metrics__int_dec(mosq_gauge_out_packet_bytes, context->out_packet_bytes);
860
0
  context->out_packet_count = 0;
861
0
  context->out_packet_bytes = 0;
862
863
0
  packet__cleanup(&(context->in_packet));
864
0
}
865
866
static int rand_between(int low, int high)
867
0
{
868
0
  int r;
869
0
  util__random_bytes(&r, sizeof(int));
870
0
  return (abs(r) % (high - low)) + low;
871
0
}
872
873
static void bridge__backoff_step(struct mosquitto__bridge *bridge)
874
0
{
875
  /*
876
    “Decorrelated Jitter” calculation, according to:
877
      https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
878
  */
879
880
0
  bridge->restart_timeout = rand_between(bridge->backoff_base, bridge->restart_timeout * 3);
881
0
  if(bridge->restart_timeout > bridge->backoff_cap){
882
0
    bridge->restart_timeout = bridge->backoff_cap;
883
0
  }
884
0
}
885
886
static void bridge__backoff_reset(struct mosquitto__bridge *bridge)
887
0
{
888
0
  bridge->restart_timeout = bridge->backoff_base;
889
0
}
890
891
static void bridge__update_backoff(struct mosquitto__bridge *bridge)
892
0
{
893
0
  if(!bridge) return;
894
0
  if(!bridge->backoff_cap) return; /* skip if not using jitter */
895
896
0
  if (bridge->connected_at && db.now_s - bridge->connected_at >= bridge->stable_connection_period) {
897
0
    log__printf(NULL, MOSQ_LOG_INFO, "Bridge %s connection was stable enough, resetting backoff", bridge->name);
898
0
    bridge__backoff_reset(bridge);
899
0
  } else {
900
0
    bridge__backoff_step(bridge);
901
0
  }
902
903
0
  bridge->connected_at = 0;
904
905
0
  log__printf(NULL, MOSQ_LOG_INFO, "Bridge %s next backoff will be %d ms", bridge->name, bridge->restart_timeout);
906
0
}
907
908
static void bridge_check_pending(struct mosquitto *context)
909
0
{
910
0
  int err;
911
0
  socklen_t len;
912
913
0
  if(context->state == mosq_cs_connect_pending){
914
0
    len = sizeof(int);
915
0
    if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
916
0
      if(err == 0){
917
0
        mosquitto__set_state(context, mosq_cs_new);
918
#if defined(WITH_ADNS) && defined(WITH_BRIDGE)
919
        if(context->bridge){
920
          bridge__connect_step3(context);
921
        }
922
#endif
923
0
      }else if(err == ECONNREFUSED){
924
0
        do_disconnect(context, MOSQ_ERR_CONN_LOST);
925
0
        return;
926
0
      }
927
0
    }else{
928
0
      do_disconnect(context, MOSQ_ERR_CONN_LOST);
929
0
      return;
930
0
    }
931
0
  }
932
0
}
933
934
static bool reload_if_needed(struct mosquitto *context)
935
0
{
936
0
  int i;
937
938
0
  for(i=0;i<db.config->bridge_count;i++){
939
0
    if(db.config->bridges[i] && !strcmp(context->bridge->name, db.config->bridges[i]->name)){
940
0
      bridge__destroy(context);
941
0
      bridge__new(db.config->bridges[i]);
942
0
      db.config->bridges[i] = NULL;
943
0
      loop__update_next_event(100);
944
0
      return true;
945
0
    }
946
0
  }
947
948
0
  return false;
949
0
}
950
951
void bridge_check(void)
952
0
{
953
0
  static time_t last_check = 0;
954
0
  struct mosquitto *context = NULL;
955
0
  socklen_t len;
956
0
  int i;
957
0
  int rc;
958
0
  int err;
959
960
0
  if(db.now_s <= last_check) return;
961
962
0
  for(i=0; i<db.bridge_count; i++){
963
0
    if(!db.bridges[i]) continue;
964
965
0
    context = db.bridges[i];
966
967
0
    if(net__is_connected(context)){
968
0
      mosquitto__check_keepalive(context);
969
0
      bridge_check_pending(context);
970
971
      /* Check for bridges that are not round robin and not currently
972
       * connected to their primary broker. */
973
0
      if(context->bridge->round_robin == false
974
0
          && context->bridge->cur_address != 0
975
0
          && context->bridge->primary_retry
976
0
          && db.now_s >= context->bridge->primary_retry){
977
978
0
        if(context->bridge->primary_retry_sock == INVALID_SOCKET){
979
0
          rc = net__try_connect(context->bridge->addresses[0].address,
980
0
              context->bridge->addresses[0].port,
981
0
              &context->bridge->primary_retry_sock,
982
0
              context->bridge->bind_address, false);
983
984
0
          if(rc == 0){
985
0
            COMPAT_CLOSE(context->bridge->primary_retry_sock);
986
0
            context->bridge->primary_retry_sock = INVALID_SOCKET;
987
0
            context->bridge->primary_retry = 0;
988
0
            mux__delete(context);
989
0
            net__socket_close(context);
990
0
            context->bridge->cur_address = 0;
991
0
          }
992
0
        }else{
993
0
          len = sizeof(int);
994
0
          if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
995
0
            if(err == 0){
996
0
              COMPAT_CLOSE(context->bridge->primary_retry_sock);
997
0
              context->bridge->primary_retry_sock = INVALID_SOCKET;
998
0
              context->bridge->primary_retry = 0;
999
0
              mux__delete(context);
1000
0
              net__socket_close(context);
1001
0
              context->bridge->cur_address = context->bridge->address_count-1;
1002
0
            }else{
1003
0
              COMPAT_CLOSE(context->bridge->primary_retry_sock);
1004
0
              context->bridge->primary_retry_sock = INVALID_SOCKET;
1005
0
              context->bridge->primary_retry = db.now_s+5;
1006
0
              loop__update_next_event(5000);
1007
0
            }
1008
0
          }else{
1009
0
            COMPAT_CLOSE(context->bridge->primary_retry_sock);
1010
0
            context->bridge->primary_retry_sock = INVALID_SOCKET;
1011
0
            context->bridge->primary_retry = db.now_s+5;
1012
0
            loop__update_next_event(5000);
1013
0
          }
1014
0
        }
1015
0
      }
1016
0
    }else{
1017
0
      if(reload_if_needed(context)) continue;
1018
1019
      /* Want to try to restart the bridge connection */
1020
0
      if(!context->bridge->restart_t){
1021
0
        bridge__update_backoff(context->bridge);
1022
0
        context->bridge->restart_t = 1000*db.now_s+context->bridge->restart_timeout;
1023
0
        context->bridge->cur_address++;
1024
0
        if(context->bridge->cur_address == context->bridge->address_count){
1025
0
          context->bridge->cur_address = 0;
1026
0
        }
1027
0
        loop__update_next_event(context->bridge->restart_timeout);
1028
0
      }else{
1029
0
        if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect)
1030
0
            || (context->bridge->start_type == bst_automatic && 1000*db.now_s >= context->bridge->restart_t)){
1031
1032
#if defined(__GLIBC__) && defined(WITH_ADNS)
1033
          if(context->adns){
1034
            /* Connection attempted, waiting on DNS lookup */
1035
            rc = gai_error(context->adns);
1036
            if(rc == EAI_INPROGRESS){
1037
              /* Just keep on waiting */
1038
            }else if(rc == 0){
1039
              rc = bridge__connect_step2(context);
1040
              if(rc == MOSQ_ERR_SUCCESS){
1041
                mux__new(context);
1042
                if(context->out_packet){
1043
                  mux__add_out(context);
1044
                }
1045
              }else if(rc == MOSQ_ERR_CONN_PENDING){
1046
                mux__new(context);
1047
                mux__add_out(context);
1048
                context->bridge->restart_t = 0;
1049
              }else{
1050
                context->bridge->cur_address++;
1051
                if(context->bridge->cur_address == context->bridge->address_count){
1052
                  context->bridge->cur_address = 0;
1053
                }
1054
                context->bridge->restart_t = 0;
1055
              }
1056
            }else{
1057
              /* Need to retry */
1058
              if(context->adns->ar_result){
1059
                freeaddrinfo(context->adns->ar_result);
1060
              }
1061
              mosquitto__FREE(context->adns);
1062
              context->bridge->restart_t = 0;
1063
            }
1064
          }else{
1065
            rc = bridge__connect_step1(context);
1066
            if(rc){
1067
              context->bridge->cur_address++;
1068
              if(context->bridge->cur_address == context->bridge->address_count){
1069
                context->bridge->cur_address = 0;
1070
              }
1071
            }else{
1072
              /* Short wait for ADNS lookup */
1073
              context->bridge->restart_t = 1;
1074
              loop__update_next_event(1000);
1075
            }
1076
          }
1077
#else
1078
0
          {
1079
0
            rc = bridge__connect(context);
1080
0
            context->bridge->restart_t = 0;
1081
0
            if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_CONN_PENDING){
1082
0
              if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
1083
0
                context->bridge->primary_retry = db.now_s + 5;
1084
0
                loop__update_next_event(5000);
1085
0
              }
1086
0
              mux__new(context);
1087
0
              if(context->out_packet){
1088
0
                mux__add_out(context);
1089
0
              }
1090
0
            }else{
1091
0
              context->bridge->cur_address++;
1092
0
              if(context->bridge->cur_address == context->bridge->address_count){
1093
0
                context->bridge->cur_address = 0;
1094
0
              }
1095
0
            }
1096
0
          }
1097
0
#endif
1098
0
        }
1099
0
      }
1100
0
    }
1101
0
  }
1102
0
}
1103
1104
#endif