Coverage Report

Created: 2025-11-07 06:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/mosquitto/src/bridge.c
Line
Count
Source
1
/*
2
Copyright (c) 2009-2021 Roger Light <roger@atchoo.org>
3
4
All rights reserved. This program and the accompanying materials
5
are made available under the terms of the Eclipse Public License 2.0
6
and Eclipse Distribution License v1.0 which accompany this distribution.
7
8
The Eclipse Public License is available at
9
   https://www.eclipse.org/legal/epl-2.0/
10
and the Eclipse Distribution License is available at
11
  http://www.eclipse.org/org/documents/edl-v10.php.
12
13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15
Contributors:
16
   Roger Light - initial implementation and documentation.
17
*/
18
19
#include "config.h"
20
21
#include <assert.h>
22
#include <errno.h>
23
#include <stdio.h>
24
#include <string.h>
25
26
#ifndef WIN32
27
#include <netdb.h>
28
#include <sys/socket.h>
29
#include <netinet/in.h>
30
#include <netinet/tcp.h>
31
#else
32
#include <winsock2.h>
33
#include <ws2tcpip.h>
34
#endif
35
36
#ifndef WIN32
37
#include <unistd.h>
38
#else
39
#include <process.h>
40
#include <winsock2.h>
41
#include <ws2tcpip.h>
42
#endif
43
44
#include "mosquitto.h"
45
#include "mosquitto_broker_internal.h"
46
#include "mosquitto_internal.h"
47
#include "net_mosq.h"
48
#include "packet_mosq.h"
49
#include "property_common.h"
50
#include "send_mosq.h"
51
#include "sys_tree.h"
52
#include "tls_mosq.h"
53
#include "util_mosq.h"
54
#include "will_mosq.h"
55
#include "utlist.h"
56
57
#ifdef WITH_BRIDGE
58
59
static void bridge__update_backoff(struct mosquitto__bridge *bridge);
60
#if defined(__GLIBC__) && defined(WITH_ADNS)
61
static int bridge__connect_step1(struct mosquitto *context);
62
static int bridge__connect_step2(struct mosquitto *context);
63
#endif
64
static void bridge__packet_cleanup(struct mosquitto *context);
65
66
static struct mosquitto *bridge__new(struct mosquitto__bridge *bridge)
67
0
{
68
0
  struct mosquitto *new_context = NULL;
69
0
  struct mosquitto **bridges;
70
0
  char *local_id;
71
72
0
  assert(bridge);
73
74
0
  local_id = mosquitto_strdup(bridge->local_clientid);
75
0
  if(!local_id){
76
0
    return NULL;
77
0
  }
78
79
0
  HASH_FIND(hh_id, db.contexts_by_id, local_id, strlen(local_id), new_context);
80
0
  if(new_context){
81
    /* (possible from persistent db) */
82
0
    mosquitto_FREE(local_id);
83
0
  }else{
84
    /* id wasn't found, so generate a new context */
85
0
    new_context = context__init();
86
0
    if(!new_context){
87
0
      mosquitto_FREE(local_id);
88
0
      return NULL;
89
0
    }
90
0
    new_context->id = local_id;
91
0
    context__add_to_by_id(new_context);
92
0
  }
93
0
  new_context->transport = mosq_t_tcp;
94
0
  new_context->bridge = bridge;
95
0
  new_context->is_bridge = true;
96
97
0
  new_context->username = bridge->remote_username;
98
0
  new_context->password = bridge->remote_password;
99
100
0
#ifdef WITH_TLS
101
0
  new_context->tls_cafile = bridge->tls_cafile;
102
0
  new_context->tls_capath = bridge->tls_capath;
103
0
  new_context->tls_certfile = bridge->tls_certfile;
104
0
  new_context->tls_keyfile = bridge->tls_keyfile;
105
0
  new_context->tls_cert_reqs = SSL_VERIFY_PEER;
106
0
  new_context->tls_ocsp_required = bridge->tls_ocsp_required;
107
0
  new_context->tls_version = bridge->tls_version;
108
0
  new_context->tls_insecure = bridge->tls_insecure;
109
0
  new_context->tls_alpn = bridge->tls_alpn;
110
0
  new_context->tls_ciphers = bridge->tls_ciphers;
111
0
  new_context->tls_13_ciphers = bridge->tls_13_ciphers;
112
0
  new_context->tls_engine = NULL;
113
0
  new_context->tls_keyform = mosq_k_pem;
114
0
  new_context->tls_use_os_certs = bridge->tls_use_os_certs;
115
0
  new_context->ssl_ctx_defaults = true;
116
0
#ifdef FINAL_WITH_TLS_PSK
117
0
  new_context->tls_psk_identity = bridge->tls_psk_identity;
118
0
  new_context->tls_psk = bridge->tls_psk;
119
0
#endif
120
0
#endif
121
122
0
  bridge->try_private_accepted = true;
123
0
  if(bridge->clean_start_local == -1){
124
    /* default to "regular" clean start setting */
125
0
    bridge->clean_start_local = bridge->clean_start;
126
0
  }
127
0
  new_context->retain_available = bridge->outgoing_retain;
128
0
  new_context->protocol = bridge->protocol_version;
129
0
  if(!bridge->clean_start_local){
130
0
    new_context->session_expiry_interval = UINT32_MAX;
131
0
    plugin_persist__handle_client_add(new_context);
132
0
    if(new_context->expiry_list_item){
133
      /* We've restored from persistence and been added to the session
134
       * expiry list, even though we should never be expired */
135
0
      session_expiry__remove(new_context);
136
0
    }
137
0
  }
138
139
0
  bridges = mosquitto_realloc(db.bridges, (size_t)(db.bridge_count+1)*sizeof(struct mosquitto *));
140
0
  if(bridges){
141
0
    db.bridges = bridges;
142
0
    db.bridge_count++;
143
0
    db.bridges[db.bridge_count-1] = new_context;
144
0
  }else{
145
0
    return NULL;
146
0
  }
147
148
0
  return new_context;
149
0
}
150
151
152
static void bridge__destroy(struct mosquitto *context)
153
0
{
154
0
  send__disconnect(context, MQTT_RC_SUCCESS, NULL);
155
0
  context__cleanup(context, true);
156
0
}
157
158
159
void bridge__start_all(void)
160
0
{
161
0
  for(int i=0; i<db.config->bridge_count; i++){
162
0
    struct mosquitto *context;
163
0
    int ret;
164
165
0
    context = bridge__new(db.config->bridges[i]);
166
0
    if(!context){
167
0
      log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
168
0
      return;
169
0
    }
170
0
    assert(context);
171
172
#if defined(__GLIBC__) && defined(WITH_ADNS)
173
    context->bridge->restart_t = 1; /* force quick restart of bridge */
174
    loop__update_next_event(1000);
175
    ret = bridge__connect_step1(context);
176
#else
177
0
    ret = bridge__connect(context);
178
0
#endif
179
180
0
    if(ret && ret != MOSQ_ERR_CONN_PENDING){
181
0
      log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to connect bridge %s.",
182
0
          context->bridge->name);
183
0
    }
184
185
0
    db.config->bridges[i] = NULL;
186
0
  }
187
0
}
188
189
190
static int bridge__set_tcp_keepalive(struct mosquitto *context)
191
0
{
192
0
  unsigned int idle = context->bridge->tcp_keepalive_idle;
193
0
  unsigned int interval = context->bridge->tcp_keepalive_interval;
194
0
  unsigned int counter = context->bridge->tcp_keepalive_counter;
195
0
  unsigned int enabled = 1;
196
0
  bool ret;
197
198
0
  if(idle == 0 || interval == 0 || counter == 0){
199
0
    return MOSQ_ERR_SUCCESS;
200
0
  }
201
202
#ifdef WIN32
203
  ret = setsockopt(context->sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&enabled, sizeof(enabled)) ||
204
      setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPIDLE, (char *)&idle, sizeof(idle)) ||
205
      setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPINTVL, (char *)&interval, sizeof(interval)) ||
206
      setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPCNT, (char *)&counter, sizeof(counter));
207
#else
208
0
  ret = setsockopt(context->sock, SOL_SOCKET, SO_KEEPALIVE, (const void *)&enabled, sizeof(enabled)) ||
209
0
#ifndef __APPLE__
210
0
      setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPIDLE, (const void *)&idle, sizeof(idle)) ||
211
0
#endif
212
0
      setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPINTVL, (const void *)&interval, sizeof(interval)) ||
213
0
      setsockopt(context->sock, IPPROTO_TCP, TCP_KEEPCNT, (const void *)&counter, sizeof(counter));
214
0
#endif
215
216
0
  if(ret){
217
0
    return MOSQ_ERR_UNKNOWN;
218
0
  }
219
220
0
  return MOSQ_ERR_SUCCESS;
221
0
}
222
223
#ifdef WITH_TCP_USER_TIMEOUT
224
225
226
static int bridge__set_tcp_user_timeout(struct mosquitto *context)
227
0
{
228
0
  int timeout = context->bridge->tcp_user_timeout;
229
0
  if(timeout >= 0){
230
0
    if(setsockopt(context->sock, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&timeout, sizeof(timeout))){
231
0
      return MOSQ_ERR_UNKNOWN;
232
0
    }
233
0
  }
234
235
0
  return MOSQ_ERR_SUCCESS;
236
0
}
237
#endif
238
239
#if defined(__GLIBC__) && defined(WITH_ADNS)
240
241
242
static int bridge__connect_step1(struct mosquitto *context)
243
{
244
  int rc;
245
  char *notification_topic;
246
  size_t notification_topic_len;
247
  uint8_t notification_payload;
248
  struct mosquitto__bridge_topic *cur_topic;
249
  uint8_t qos;
250
251
  if(!context || !context->bridge){
252
    return MOSQ_ERR_INVAL;
253
  }
254
255
  mosquitto__set_state(context, mosq_cs_new);
256
  context->sock = INVALID_SOCKET;
257
  context->last_msg_in = db.now_s;
258
  context->next_msg_out = db.now_s + context->bridge->keepalive;
259
  context->keepalive = context->bridge->keepalive;
260
  context->clean_start = context->bridge->clean_start;
261
  context->in_packet.payload = NULL;
262
  context->ping_t = 0;
263
  context->bridge->lazy_reconnect = false;
264
  context->maximum_packet_size = context->bridge->maximum_packet_size;
265
  bridge__packet_cleanup(context);
266
  db__message_reconnect_reset(context);
267
268
  db__messages_delete(context, false);
269
270
  /* Delete all local subscriptions even for clean_start==false. We don't
271
   * remove any messages and the next loop carries out the resubscription
272
   * anyway. This means any unwanted subs will be removed.
273
   */
274
  sub__clean_session(context);
275
276
  LL_FOREACH(context->bridge->topics, cur_topic){
277
    if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){
278
      log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, cur_topic->local_topic);
279
      if(cur_topic->qos > context->max_qos){
280
        qos = context->max_qos;
281
      }else{
282
        qos = cur_topic->qos;
283
      }
284
      struct mosquitto_subscription sub;
285
      sub.topic_filter = cur_topic->local_topic;
286
      sub.identifier = 0;
287
      sub.options = MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | qos;
288
      if(sub__add(context, &sub) > 0){
289
        return 1;
290
      }
291
      retain__queue(context, &sub);
292
    }
293
  }
294
295
  if(context->bridge->notifications){
296
    if(context->max_qos == 0){
297
      qos = 0;
298
    }else{
299
      qos = 1;
300
    }
301
    if(context->bridge->notification_topic){
302
      if(!context->bridge->initial_notification_done){
303
        notification_payload = '0';
304
        db__messages_easy_queue(context, context->bridge->notification_topic, qos, 1, &notification_payload, 1, MSG_EXPIRY_INFINITE, NULL);
305
        context->bridge->initial_notification_done = true;
306
      }
307
      notification_payload = '0';
308
      rc = will__set(context, context->bridge->notification_topic, 1, &notification_payload, qos, true, NULL);
309
      if(rc != MOSQ_ERR_SUCCESS){
310
        return rc;
311
      }
312
    }else{
313
      notification_topic_len = strlen(context->bridge->remote_clientid)+strlen("$SYS/broker/connection//state");
314
      notification_topic = mosquitto_malloc(sizeof(char)*(notification_topic_len+1));
315
      if(!notification_topic){
316
        return MOSQ_ERR_NOMEM;
317
      }
318
319
      snprintf(notification_topic, notification_topic_len+1, "$SYS/broker/connection/%s/state", context->bridge->remote_clientid);
320
321
      if(!context->bridge->initial_notification_done){
322
        notification_payload = '0';
323
        db__messages_easy_queue(context, notification_topic, qos, 1, &notification_payload, 1, MSG_EXPIRY_INFINITE, NULL);
324
        context->bridge->initial_notification_done = true;
325
      }
326
327
      notification_payload = '0';
328
      rc = will__set(context, notification_topic, 1, &notification_payload, qos, true, NULL);
329
      mosquitto_FREE(notification_topic);
330
      if(rc != MOSQ_ERR_SUCCESS){
331
        return rc;
332
      }
333
    }
334
  }
335
336
  log__printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge (step 1) %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port);
337
  rc = net__try_connect_step1(context, context->bridge->addresses[context->bridge->cur_address].address);
338
  if(rc > 0){
339
    if(rc == MOSQ_ERR_TLS){
340
      mux__delete(context);
341
      net__socket_close(context);
342
      return rc; /* Error already printed */
343
    }else if(rc == MOSQ_ERR_ERRNO){
344
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
345
    }else if(rc == MOSQ_ERR_EAI){
346
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
347
    }
348
349
    return rc;
350
  }
351
352
  return MOSQ_ERR_SUCCESS;
353
}
354
355
356
static int bridge__connect_step2(struct mosquitto *context)
357
{
358
  int rc;
359
360
  if(!context || !context->bridge){
361
    return MOSQ_ERR_INVAL;
362
  }
363
364
  log__printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge (step 2) %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port);
365
  rc = net__try_connect_step2(context, context->bridge->addresses[context->bridge->cur_address].port, &context->sock);
366
  if(rc > 0){
367
    if(rc == MOSQ_ERR_TLS){
368
      mux__delete(context);
369
      net__socket_close(context);
370
      return rc; /* Error already printed */
371
    }else if(rc == MOSQ_ERR_ERRNO){
372
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
373
    }else if(rc == MOSQ_ERR_EAI){
374
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
375
    }
376
377
    return rc;
378
  }
379
380
  HASH_ADD(hh_sock, db.contexts_by_sock, sock, sizeof(context->sock), context);
381
382
  if(rc == MOSQ_ERR_CONN_PENDING){
383
    mosquitto__set_state(context, mosq_cs_connect_pending);
384
    mux__add_out(context);
385
  }
386
  return rc;
387
}
388
389
390
int bridge__connect_step3(struct mosquitto *context)
391
{
392
  int rc;
393
  mosquitto_property receive_maximum;
394
  mosquitto_property session_expiry_interval;
395
  mosquitto_property topic_alias_max;
396
  mosquitto_property *properties = NULL;
397
398
  rc = net__socket_connect_step3(context, context->bridge->addresses[context->bridge->cur_address].address);
399
  if(rc > 0){
400
    if(rc == MOSQ_ERR_TLS){
401
      mux__delete(context);
402
      net__socket_close(context);
403
      return rc; /* Error already printed */
404
    }else if(rc == MOSQ_ERR_ERRNO){
405
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
406
    }else if(rc == MOSQ_ERR_EAI){
407
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
408
    }
409
410
    return rc;
411
  }
412
413
  if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
414
    context->bridge->primary_retry = db.now_s + 5;
415
    loop__update_next_event(5000);
416
  }
417
418
  if(bridge__set_tcp_keepalive(context) != MOSQ_ERR_SUCCESS){
419
    return MOSQ_ERR_UNKNOWN;
420
  }
421
#ifdef WITH_TCP_USER_TIMEOUT
422
  if(bridge__set_tcp_user_timeout(context)){
423
    return MOSQ_ERR_UNKNOWN;
424
  }
425
#endif
426
427
  if(context->bridge->receive_maximum != 0){
428
    receive_maximum.value.i16 = context->bridge->receive_maximum;
429
    receive_maximum.identifier = MQTT_PROP_RECEIVE_MAXIMUM;
430
    receive_maximum.property_type = MQTT_PROP_TYPE_INT16;
431
    receive_maximum.client_generated = false;
432
    receive_maximum.next = properties;
433
    properties = &receive_maximum;
434
  }
435
  if(context->bridge->session_expiry_interval != 0){
436
    session_expiry_interval.value.i32 = context->bridge->session_expiry_interval;
437
    session_expiry_interval.identifier = MQTT_PROP_SESSION_EXPIRY_INTERVAL;
438
    session_expiry_interval.property_type = MQTT_PROP_TYPE_INT32;
439
    session_expiry_interval.client_generated = false;
440
    session_expiry_interval.next = properties;
441
    properties = &session_expiry_interval;
442
  }
443
  if(context->bridge->max_topic_alias != 0){
444
    topic_alias_max.value.i16 = context->bridge->max_topic_alias;
445
    topic_alias_max.identifier = MQTT_PROP_TOPIC_ALIAS_MAXIMUM;
446
    topic_alias_max.property_type = MQTT_PROP_TYPE_INT16;
447
    topic_alias_max.client_generated = false;
448
    topic_alias_max.next = properties;
449
    properties = &topic_alias_max;
450
  }
451
452
  rc = send__connect(context, context->keepalive, context->clean_start, properties);
453
  if(rc == MOSQ_ERR_SUCCESS){
454
    return MOSQ_ERR_SUCCESS;
455
  }else if(rc == MOSQ_ERR_ERRNO && errno == ENOTCONN){
456
    return MOSQ_ERR_SUCCESS;
457
  }else{
458
    if(rc == MOSQ_ERR_TLS){
459
      return rc; /* Error already printed */
460
    }else if(rc == MOSQ_ERR_ERRNO){
461
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
462
    }else if(rc == MOSQ_ERR_EAI){
463
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
464
    }
465
    mux__delete(context);
466
    net__socket_close(context);
467
    return rc;
468
  }
469
}
470
#else
471
472
473
int bridge__connect(struct mosquitto *context)
474
0
{
475
0
  int rc, rc2;
476
0
  char *notification_topic = NULL;
477
0
  size_t notification_topic_len;
478
0
  uint8_t notification_payload;
479
0
  struct mosquitto__bridge_topic *cur_topic;
480
0
  uint8_t qos;
481
482
0
  mosquitto_property receive_maximum;
483
0
  mosquitto_property session_expiry_interval;
484
0
  mosquitto_property topic_alias_max;
485
0
  mosquitto_property *properties = NULL;
486
487
0
  if(!context || !context->bridge){
488
0
    return MOSQ_ERR_INVAL;
489
0
  }
490
491
0
  mosquitto__set_state(context, mosq_cs_new);
492
0
  context->sock = INVALID_SOCKET;
493
  /* coverity[missing_lock] - broker is single threaded, so no lock required */
494
0
  context->last_msg_in = db.now_s;
495
  /* coverity[missing_lock] - broker is single threaded, so no lock required */
496
0
  context->next_msg_out = db.now_s + context->bridge->keepalive;
497
0
  context->keepalive = context->bridge->keepalive;
498
0
  context->clean_start = context->bridge->clean_start;
499
0
  context->in_packet.payload = NULL;
500
0
  context->ping_t = 0;
501
0
  context->bridge->lazy_reconnect = false;
502
0
  context->maximum_packet_size = context->bridge->maximum_packet_size;
503
0
  bridge__packet_cleanup(context);
504
0
  db__message_reconnect_reset(context);
505
506
0
  db__messages_delete(context, false);
507
508
  /* Delete all local subscriptions even for clean_start==false. We don't
509
   * remove any messages and the next loop carries out the resubscription
510
   * anyway. This means any unwanted subs will be removed.
511
   */
512
0
  sub__clean_session(context);
513
514
0
  LL_FOREACH(context->bridge->topics, cur_topic){
515
0
    if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){
516
0
      log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, cur_topic->local_topic);
517
0
      if(cur_topic->qos > context->max_qos){
518
0
        qos = context->max_qos;
519
0
      }else{
520
0
        qos = cur_topic->qos;
521
0
      }
522
0
      struct mosquitto_subscription sub;
523
0
      sub.topic_filter = cur_topic->local_topic;
524
0
      sub.identifier = 0;
525
0
      sub.options = MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | qos;
526
0
      if(sub__add(context, &sub) > 0){
527
0
        return 1;
528
0
      }
529
0
    }
530
0
  }
531
532
0
  if(context->bridge->notifications){
533
0
    if(context->max_qos == 0){
534
0
      qos = 0;
535
0
    }else{
536
0
      qos = 1;
537
0
    }
538
0
    if(context->bridge->notification_topic){
539
0
      if(!context->bridge->initial_notification_done){
540
0
        notification_payload = '0';
541
0
        db__messages_easy_queue(context, context->bridge->notification_topic, qos, 1, &notification_payload, 1, MSG_EXPIRY_INFINITE, NULL);
542
0
        context->bridge->initial_notification_done = true;
543
0
      }
544
545
0
      notification_payload = '0';
546
0
      rc = will__set(context, context->bridge->notification_topic, 1, &notification_payload, qos, true, NULL);
547
0
      if(rc != MOSQ_ERR_SUCCESS){
548
0
        return rc;
549
0
      }
550
0
    }else{
551
0
      notification_topic_len = strlen(context->bridge->remote_clientid)+strlen("$SYS/broker/connection//state");
552
0
      notification_topic = mosquitto_malloc(sizeof(char)*(notification_topic_len+1));
553
0
      if(!notification_topic){
554
0
        return MOSQ_ERR_NOMEM;
555
0
      }
556
557
0
      snprintf(notification_topic, notification_topic_len+1, "$SYS/broker/connection/%s/state", context->bridge->remote_clientid);
558
559
0
      if(!context->bridge->initial_notification_done){
560
0
        notification_payload = '0';
561
0
        db__messages_easy_queue(context, notification_topic, qos, 1, &notification_payload, 1, MSG_EXPIRY_INFINITE, NULL);
562
0
        context->bridge->initial_notification_done = true;
563
0
      }
564
565
0
      notification_payload = '0';
566
0
      rc = will__set(context, notification_topic, 1, &notification_payload, qos, true, NULL);
567
0
      if(rc != MOSQ_ERR_SUCCESS){
568
0
        mosquitto_FREE(notification_topic);
569
0
        return rc;
570
0
      }
571
0
      mosquitto_FREE(notification_topic);
572
0
    }
573
0
  }
574
575
0
  log__printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port);
576
0
  rc = net__socket_connect(context,
577
0
      context->bridge->addresses[context->bridge->cur_address].address,
578
0
      context->bridge->addresses[context->bridge->cur_address].port,
579
0
      context->bridge->bind_address,
580
0
      false);
581
582
0
  if(rc > 0){
583
0
    if(rc == MOSQ_ERR_TLS){
584
0
      mux__delete(context);
585
0
      net__socket_close(context);
586
0
      return rc; /* Error already printed */
587
0
    }else if(rc == MOSQ_ERR_ERRNO){
588
0
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
589
0
    }else if(rc == MOSQ_ERR_EAI){
590
0
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
591
0
    }
592
593
0
    return rc;
594
0
  }else if(rc == MOSQ_ERR_CONN_PENDING){
595
0
    mosquitto__set_state(context, mosq_cs_connect_pending);
596
0
    mux__add_out(context);
597
0
  }
598
599
0
  HASH_ADD(hh_sock, db.contexts_by_sock, sock, sizeof(context->sock), context);
600
601
0
  if(bridge__set_tcp_keepalive(context) != MOSQ_ERR_SUCCESS){
602
0
    return MOSQ_ERR_UNKNOWN;
603
0
  }
604
0
#ifdef WITH_TCP_USER_TIMEOUT
605
0
  if(bridge__set_tcp_user_timeout(context)){
606
0
    return MOSQ_ERR_UNKNOWN;
607
0
  }
608
0
#endif
609
610
0
  if(context->bridge->receive_maximum != 0){
611
0
    receive_maximum.value.i16 = context->bridge->receive_maximum;
612
0
    receive_maximum.identifier = MQTT_PROP_RECEIVE_MAXIMUM;
613
0
    receive_maximum.property_type = MQTT_PROP_TYPE_INT16;
614
0
    receive_maximum.client_generated = false;
615
0
    receive_maximum.next = properties;
616
0
    properties = &receive_maximum;
617
0
  }
618
0
  if(context->bridge->session_expiry_interval != 0){
619
0
    session_expiry_interval.value.i32 = context->bridge->session_expiry_interval;
620
0
    session_expiry_interval.identifier = MQTT_PROP_SESSION_EXPIRY_INTERVAL;
621
0
    session_expiry_interval.property_type = MQTT_PROP_TYPE_INT32;
622
0
    session_expiry_interval.client_generated = false;
623
0
    session_expiry_interval.next = properties;
624
0
    properties = &session_expiry_interval;
625
0
  }
626
0
  if(context->bridge->max_topic_alias != 0){
627
0
    topic_alias_max.value.i16 = context->bridge->max_topic_alias;
628
0
    topic_alias_max.identifier = MQTT_PROP_TOPIC_ALIAS_MAXIMUM;
629
0
    topic_alias_max.property_type = MQTT_PROP_TYPE_INT16;
630
0
    topic_alias_max.client_generated = false;
631
0
    topic_alias_max.next = properties;
632
0
    properties = &topic_alias_max;
633
0
  }
634
635
0
  rc2 = send__connect(context, context->keepalive, context->clean_start, properties);
636
0
  if(rc2 == MOSQ_ERR_SUCCESS){
637
0
    return rc;
638
0
  }else if(rc2 == MOSQ_ERR_ERRNO && errno == ENOTCONN){
639
0
    return MOSQ_ERR_SUCCESS;
640
0
  }else{
641
0
    if(rc2 == MOSQ_ERR_TLS){
642
0
      return rc2; /* Error already printed */
643
0
    }else if(rc2 == MOSQ_ERR_ERRNO){
644
0
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
645
0
    }else if(rc2 == MOSQ_ERR_EAI){
646
0
      log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
647
0
    }
648
0
    mux__delete(context);
649
0
    net__socket_close(context);
650
0
    return rc2;
651
0
  }
652
0
}
653
#endif
654
655
656
int bridge__on_connect(struct mosquitto *context)
657
0
{
658
0
  char *notification_topic;
659
0
  size_t notification_topic_len;
660
0
  struct mosquitto__bridge_topic *cur_topic;
661
0
  int sub_opts;
662
0
  bool retain = true;
663
0
  uint8_t qos;
664
665
0
  if(context->bridge->notifications){
666
0
    if(context->max_qos == 0){
667
0
      qos = 0;
668
0
    }else{
669
0
      qos = 1;
670
0
    }
671
0
    if(!context->retain_available){
672
0
      retain = false;
673
0
    }
674
0
    char notification_payload = '1';
675
0
    if(context->bridge->notification_topic){
676
0
      if(!context->bridge->notifications_local_only){
677
0
        if(send__real_publish(context, mosquitto__mid_generate(context),
678
0
            context->bridge->notification_topic, 1, &notification_payload, qos, retain, 0, 0, NULL, 0)){
679
680
0
          return 1;
681
0
        }
682
0
      }
683
0
      db__messages_easy_queue(context, context->bridge->notification_topic, qos, 1, &notification_payload, 1, MSG_EXPIRY_INFINITE, NULL);
684
0
    }else{
685
0
      notification_topic_len = strlen(context->bridge->remote_clientid)+strlen("$SYS/broker/connection//state");
686
0
      notification_topic = mosquitto_malloc(sizeof(char)*(notification_topic_len+1));
687
0
      if(!notification_topic){
688
0
        return MOSQ_ERR_NOMEM;
689
0
      }
690
691
0
      snprintf(notification_topic, notification_topic_len+1, "$SYS/broker/connection/%s/state", context->bridge->remote_clientid);
692
0
      notification_payload = '1';
693
0
      if(!context->bridge->notifications_local_only){
694
0
        if(send__real_publish(context, mosquitto__mid_generate(context),
695
0
            notification_topic, 1, &notification_payload, qos, retain, 0, 0, NULL, 0)){
696
697
0
          mosquitto_FREE(notification_topic);
698
0
          return 1;
699
0
        }
700
0
      }
701
0
      db__messages_easy_queue(context, notification_topic, qos, 1, &notification_payload, 1, MSG_EXPIRY_INFINITE, NULL);
702
0
      mosquitto_FREE(notification_topic);
703
0
    }
704
0
  }
705
706
0
  LL_FOREACH(context->bridge->topics, cur_topic){
707
0
    if(cur_topic->direction == bd_in || cur_topic->direction == bd_both){
708
0
      if(cur_topic->qos > context->max_qos){
709
0
        sub_opts = context->max_qos;
710
0
      }else{
711
0
        sub_opts = cur_topic->qos;
712
0
      }
713
0
      if(context->bridge->protocol_version == mosq_p_mqtt5){
714
0
        sub_opts = sub_opts
715
0
            | MQTT_SUB_OPT_NO_LOCAL
716
0
            | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED
717
0
            | MQTT_SUB_OPT_SEND_RETAIN_ALWAYS;
718
0
      }
719
0
      if(send__subscribe(context, NULL, 1, &cur_topic->remote_topic, sub_opts, NULL)){
720
0
        return 1;
721
0
      }
722
0
    }else{
723
0
      if(context->bridge->attempt_unsubscribe){
724
0
        if(send__unsubscribe(context, NULL, 1, &cur_topic->remote_topic, NULL)){
725
          /* direction = inwards only. This means we should not be subscribed
726
          * to the topic. It is possible that we used to be subscribed to
727
          * this topic so unsubscribe. */
728
0
          return 1;
729
0
        }
730
0
      }
731
0
    }
732
0
  }
733
0
  struct mosquitto_subscription sub;
734
0
  memset(&sub, 0, sizeof(sub));
735
0
  LL_FOREACH(context->bridge->topics, cur_topic){
736
0
    sub.topic_filter = cur_topic->local_topic;
737
0
    if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){
738
0
      if(cur_topic->qos > context->max_qos){
739
0
        sub.options = context->max_qos;
740
0
      }else{
741
0
        sub.options = cur_topic->qos;
742
0
      }
743
0
      retain__queue(context, &sub);
744
0
    }
745
0
  }
746
747
0
  context->bridge->connected_at = db.now_s;
748
749
0
  return MOSQ_ERR_SUCCESS;
750
0
}
751
752
753
int bridge__register_local_connections(void)
754
0
{
755
0
  struct mosquitto *context, *ctxt_tmp = NULL;
756
757
0
  HASH_ITER(hh_sock, db.contexts_by_sock, context, ctxt_tmp){
758
0
    if(context->bridge){
759
0
      if(mux__new(context)){
760
0
        log__printf(NULL, MOSQ_LOG_ERR, "Error in initial bridge registration: %s", strerror(errno));
761
0
        return MOSQ_ERR_UNKNOWN;
762
0
      }
763
0
      mux__add_out(context);
764
0
    }
765
0
  }
766
0
  return MOSQ_ERR_SUCCESS;
767
0
}
768
769
770
void bridge__reload(void)
771
0
{
772
0
  int i;
773
0
  int j;
774
775
  // destroy old bridges that dissappeared
776
0
  for(i=0; i<db.bridge_count; i++){
777
0
    for(j=0; j<db.config->bridge_count; j++){
778
0
      if(!strcmp(db.bridges[i]->bridge->name, db.config->bridges[j]->name)){
779
0
        break;
780
0
      }
781
0
    }
782
783
0
    if(j==db.config->bridge_count){
784
0
      bridge__destroy(db.bridges[i]);
785
0
    }
786
0
  }
787
788
0
  for(i=0; i<db.config->bridge_count; i++){
789
0
    for(j=0; j<db.bridge_count; j++){
790
0
      if(!strcmp(db.config->bridges[i]->name, db.bridges[j]->bridge->name)){
791
0
        break;
792
0
      }
793
0
    }
794
795
0
    if(j==db.bridge_count){
796
      // a new bridge was found, create it
797
0
      bridge__new(db.config->bridges[i]);
798
0
      db.config->bridges[i] = NULL;
799
0
      continue;
800
0
    }
801
802
0
    if(db.config->bridges[i]->reload_type == brt_immediate){
803
      // in this case, an existing bridge should match
804
0
      for(j=0; j<db.bridge_count; j++){
805
0
        if(!strcmp(db.config->bridges[i]->name, db.bridges[j]->bridge->name)){
806
0
          break;
807
0
        }
808
0
      }
809
810
0
      assert(j<db.bridge_count);
811
0
      db.bridges[j]->will_delay_interval = 0;
812
0
      bridge__destroy(db.bridges[j]);
813
0
      bridge__new(db.config->bridges[i]);
814
0
      db.config->bridges[i] = NULL;
815
0
    }
816
0
  }
817
0
}
818
819
820
void bridge__db_cleanup(void)
821
0
{
822
0
  int i;
823
824
0
  for(i=0; i<db.bridge_count; i++){
825
0
    if(db.bridges[i]){
826
0
      context__cleanup(db.bridges[i], true);
827
0
    }
828
0
  }
829
0
  mosquitto_FREE(db.bridges);
830
0
}
831
832
833
void bridge__cleanup(struct mosquitto *context)
834
0
{
835
0
  int i;
836
837
0
  assert(db.bridge_count > 0);
838
839
0
  for(i=0; i<db.bridge_count; i++){
840
0
    if(db.bridges[i] == context){
841
0
      db.bridges[i] = db.bridges[db.bridge_count-1];
842
0
      break;
843
0
    }
844
0
  }
845
846
0
  db.bridge_count--;
847
0
  if(db.bridge_count == 0){
848
0
    mosquitto_FREE(db.bridges);
849
0
  }else{
850
0
    db.bridges = mosquitto_realloc(db.bridges, (unsigned)db.bridge_count * sizeof(db.bridges[0]));
851
0
  }
852
853
0
  mosquitto_FREE(context->bridge->name);
854
0
  mosquitto_FREE(context->bridge->local_clientid);
855
0
  mosquitto_FREE(context->bridge->local_username);
856
0
  mosquitto_FREE(context->bridge->local_password);
857
0
  mosquitto_FREE(context->bridge->tls_certfile);
858
0
  mosquitto_FREE(context->bridge->tls_keyfile);
859
860
0
  if(context->bridge->remote_clientid != context->id){
861
0
    mosquitto_FREE(context->bridge->remote_clientid);
862
0
  }
863
0
  context->bridge->remote_clientid = NULL;
864
865
0
  if(context->bridge->remote_username != context->username){
866
0
    mosquitto_FREE(context->bridge->remote_username);
867
0
  }
868
0
  context->bridge->remote_username = NULL;
869
870
0
  if(context->bridge->remote_password != context->password){
871
0
    mosquitto_FREE(context->bridge->remote_password);
872
0
  }
873
0
  context->bridge->remote_password = NULL;
874
0
#ifdef WITH_TLS
875
0
  if(context->ssl_ctx){
876
0
    SSL_CTX_free(context->ssl_ctx);
877
0
    context->ssl_ctx = NULL;
878
0
  }
879
0
#endif
880
881
0
  for(i=0; i<context->bridge->address_count; i++){
882
0
    mosquitto_FREE(context->bridge->addresses[i].address);
883
0
  }
884
885
0
  mosquitto_FREE(context->bridge->addresses);
886
887
0
  config__bridge_cleanup(context->bridge);
888
0
  context->bridge = NULL;
889
0
}
890
891
892
static void bridge__packet_cleanup(struct mosquitto *context)
893
0
{
894
0
  struct mosquitto__packet *packet;
895
0
  if(!context){
896
0
    return;
897
0
  }
898
899
0
  while(context->out_packet){
900
0
    packet = context->out_packet;
901
0
    context->out_packet = context->out_packet->next;
902
0
    mosquitto_FREE(packet);
903
0
  }
904
0
  context->out_packet = NULL;
905
0
  context->out_packet_last = NULL;
906
0
  metrics__int_dec(mosq_gauge_out_packets, context->out_packet_count);
907
0
  metrics__int_dec(mosq_gauge_out_packet_bytes, context->out_packet_bytes);
908
0
  context->out_packet_count = 0;
909
0
  context->out_packet_bytes = 0;
910
911
0
  packet__cleanup(&(context->in_packet));
912
0
}
913
914
915
static int rand_between(int low, int high)
916
0
{
917
0
  int r;
918
0
  mosquitto_getrandom(&r, sizeof(int));
919
0
  return (abs(r) % (high - low)) + low;
920
0
}
921
922
923
static void bridge__backoff_step(struct mosquitto__bridge *bridge)
924
0
{
925
  /*
926
    “Decorrelated Jitter” calculation, according to:
927
      https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
928
  */
929
930
0
  bridge->restart_timeout = rand_between(bridge->backoff_base, bridge->restart_timeout * 3);
931
0
  if(bridge->restart_timeout > bridge->backoff_cap){
932
0
    bridge->restart_timeout = bridge->backoff_cap;
933
0
  }
934
0
}
935
936
937
static void bridge__backoff_reset(struct mosquitto__bridge *bridge)
938
0
{
939
0
  bridge->restart_timeout = bridge->backoff_base;
940
0
}
941
942
943
static void bridge__update_backoff(struct mosquitto__bridge *bridge)
944
0
{
945
0
  if(!bridge){
946
0
    return;
947
0
  }
948
0
  if(!bridge->backoff_cap){
949
    /* skip if not using jitter */
950
0
    return;
951
952
0
  }
953
0
  if(bridge->connected_at && db.now_s - bridge->connected_at >= bridge->stable_connection_period){
954
0
    log__printf(NULL, MOSQ_LOG_INFO, "Bridge %s connection was stable enough, resetting backoff", bridge->name);
955
0
    bridge__backoff_reset(bridge);
956
0
  }else{
957
0
    bridge__backoff_step(bridge);
958
0
  }
959
960
0
  bridge->connected_at = 0;
961
962
0
  log__printf(NULL, MOSQ_LOG_INFO, "Bridge %s next backoff will be %d ms", bridge->name, bridge->restart_timeout);
963
0
}
964
965
966
static void bridge_check_pending(struct mosquitto *context)
967
0
{
968
0
  int err;
969
0
  socklen_t len;
970
971
0
  if(context->state == mosq_cs_connect_pending){
972
0
    len = sizeof(int);
973
0
    if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
974
0
      if(err == 0){
975
0
        mosquitto__set_state(context, mosq_cs_new);
976
#if defined(WITH_ADNS) && defined(WITH_BRIDGE)
977
        if(context->bridge){
978
          bridge__connect_step3(context);
979
        }
980
#endif
981
0
      }else if(err == ECONNREFUSED){
982
0
        do_disconnect(context, MOSQ_ERR_CONN_LOST);
983
0
        return;
984
0
      }
985
0
    }else{
986
0
      do_disconnect(context, MOSQ_ERR_CONN_LOST);
987
0
      return;
988
0
    }
989
0
  }
990
0
}
991
992
993
static bool reload_if_needed(struct mosquitto *context)
994
0
{
995
0
  int i;
996
997
0
  for(i=0; i<db.config->bridge_count; i++){
998
0
    if(db.config->bridges[i] && !strcmp(context->bridge->name, db.config->bridges[i]->name)){
999
0
      bridge__destroy(context);
1000
0
      bridge__new(db.config->bridges[i]);
1001
0
      db.config->bridges[i] = NULL;
1002
0
      loop__update_next_event(100);
1003
0
      return true;
1004
0
    }
1005
0
  }
1006
1007
0
  return false;
1008
0
}
1009
1010
1011
void bridge_check(void)
1012
0
{
1013
0
  static time_t last_check = 0;
1014
0
  struct mosquitto *context = NULL;
1015
0
  socklen_t len;
1016
0
  int i;
1017
0
  int rc;
1018
0
  int err;
1019
1020
0
  if(db.now_s <= last_check){
1021
0
    return;
1022
0
  }
1023
1024
0
  for(i=0; i<db.bridge_count; i++){
1025
0
    if(!db.bridges[i]){
1026
0
      continue;
1027
0
    }
1028
1029
0
    context = db.bridges[i];
1030
1031
0
    if(net__is_connected(context)){
1032
0
      mosquitto__check_keepalive(context);
1033
0
      bridge_check_pending(context);
1034
1035
      /* Check for bridges that are not round robin and not currently
1036
       * connected to their primary broker. */
1037
0
      if(context->bridge->round_robin == false
1038
0
          && context->bridge->cur_address != 0
1039
0
          && context->bridge->primary_retry
1040
0
          && db.now_s >= context->bridge->primary_retry){
1041
1042
0
        if(context->bridge->primary_retry_sock == INVALID_SOCKET){
1043
0
          rc = net__try_connect(context->bridge->addresses[0].address,
1044
0
              context->bridge->addresses[0].port,
1045
0
              &context->bridge->primary_retry_sock,
1046
0
              context->bridge->bind_address, false);
1047
1048
0
          if(rc == 0){
1049
0
            COMPAT_CLOSE(context->bridge->primary_retry_sock);
1050
0
            context->bridge->primary_retry_sock = INVALID_SOCKET;
1051
0
            context->bridge->primary_retry = 0;
1052
0
            mux__delete(context);
1053
0
            net__socket_close(context);
1054
0
            context->bridge->cur_address = 0;
1055
0
          }
1056
0
        }else{
1057
0
          len = sizeof(int);
1058
0
          if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
1059
0
            if(err == 0){
1060
0
              COMPAT_CLOSE(context->bridge->primary_retry_sock);
1061
0
              context->bridge->primary_retry_sock = INVALID_SOCKET;
1062
0
              context->bridge->primary_retry = 0;
1063
0
              mux__delete(context);
1064
0
              net__socket_close(context);
1065
0
              context->bridge->cur_address = context->bridge->address_count-1;
1066
0
            }else{
1067
0
              COMPAT_CLOSE(context->bridge->primary_retry_sock);
1068
0
              context->bridge->primary_retry_sock = INVALID_SOCKET;
1069
0
              context->bridge->primary_retry = db.now_s+5;
1070
0
              loop__update_next_event(5000);
1071
0
            }
1072
0
          }else{
1073
0
            COMPAT_CLOSE(context->bridge->primary_retry_sock);
1074
0
            context->bridge->primary_retry_sock = INVALID_SOCKET;
1075
0
            context->bridge->primary_retry = db.now_s+5;
1076
0
            loop__update_next_event(5000);
1077
0
          }
1078
0
        }
1079
0
      }
1080
0
    }else{
1081
0
      if(reload_if_needed(context)){
1082
0
        continue;
1083
0
      }
1084
1085
      /* Want to try to restart the bridge connection */
1086
0
      if(!context->bridge->restart_t){
1087
0
        bridge__update_backoff(context->bridge);
1088
0
        context->bridge->restart_t = 1000*db.now_s+context->bridge->restart_timeout;
1089
0
        context->bridge->cur_address++;
1090
0
        if(context->bridge->cur_address == context->bridge->address_count){
1091
0
          context->bridge->cur_address = 0;
1092
0
        }
1093
0
        loop__update_next_event(context->bridge->restart_timeout);
1094
0
      }else{
1095
0
        if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect)
1096
0
            || (context->bridge->start_type == bst_automatic && 1000*db.now_s >= context->bridge->restart_t)){
1097
1098
#if defined(__GLIBC__) && defined(WITH_ADNS)
1099
          if(context->adns){
1100
            /* Connection attempted, waiting on DNS lookup */
1101
            rc = gai_error(context->adns);
1102
            if(rc == EAI_INPROGRESS){
1103
              /* Just keep on waiting */
1104
            }else if(rc == 0){
1105
              rc = bridge__connect_step2(context);
1106
              if(rc == MOSQ_ERR_SUCCESS){
1107
                mux__new(context);
1108
                if(context->out_packet){
1109
                  mux__add_out(context);
1110
                }
1111
              }else if(rc == MOSQ_ERR_CONN_PENDING){
1112
                mux__new(context);
1113
                mux__add_out(context);
1114
                context->bridge->restart_t = 0;
1115
              }else{
1116
                context->bridge->cur_address++;
1117
                if(context->bridge->cur_address == context->bridge->address_count){
1118
                  context->bridge->cur_address = 0;
1119
                }
1120
                context->bridge->restart_t = 0;
1121
              }
1122
            }else{
1123
              /* Need to retry */
1124
              if(context->adns->ar_result){
1125
                freeaddrinfo(context->adns->ar_result);
1126
              }
1127
              mosquitto_FREE(context->adns);
1128
              context->bridge->restart_t = 0;
1129
            }
1130
          }else{
1131
            rc = bridge__connect_step1(context);
1132
            if(rc){
1133
              context->bridge->cur_address++;
1134
              if(context->bridge->cur_address == context->bridge->address_count){
1135
                context->bridge->cur_address = 0;
1136
              }
1137
            }else{
1138
              /* Short wait for ADNS lookup */
1139
              context->bridge->restart_t = 1;
1140
              loop__update_next_event(1000);
1141
            }
1142
          }
1143
#else
1144
0
          {
1145
0
            rc = bridge__connect(context);
1146
0
            context->bridge->restart_t = 0;
1147
0
            if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_CONN_PENDING){
1148
0
              if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
1149
0
                context->bridge->primary_retry = db.now_s + 5;
1150
0
                loop__update_next_event(5000);
1151
0
              }
1152
0
              mux__new(context);
1153
0
              if(context->out_packet){
1154
0
                mux__add_out(context);
1155
0
              }
1156
0
            }else{
1157
0
              context->bridge->cur_address++;
1158
0
              if(context->bridge->cur_address == context->bridge->address_count){
1159
0
                context->bridge->cur_address = 0;
1160
0
              }
1161
0
            }
1162
0
          }
1163
0
#endif
1164
0
        }
1165
0
      }
1166
0
    }
1167
0
  }
1168
0
}
1169
1170
#endif