Coverage Report

Created: 2025-08-24 06:43

/src/mosquitto/src/subs.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
Copyright (c) 2010-2021 Roger Light <roger@atchoo.org>
3
4
All rights reserved. This program and the accompanying materials
5
are made available under the terms of the Eclipse Public License 2.0
6
and Eclipse Distribution License v1.0 which accompany this distribution.
7
8
The Eclipse Public License is available at
9
   https://www.eclipse.org/legal/epl-2.0/
10
and the Eclipse Distribution License is available at
11
  http://www.eclipse.org/org/documents/edl-v10.php.
12
13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15
Contributors:
16
   Roger Light - initial implementation and documentation.
17
*/
18
19
/* A note on matching topic subscriptions.
20
 *
21
 * Topics can be up to 32767 characters in length. The / character is used as a
22
 * hierarchy delimiter. Messages are published to a particular topic.
23
 * Clients may subscribe to particular topics directly, but may also use
24
 * wildcards in subscriptions.  The + and # characters are used as wildcards.
25
 * The # wildcard can be used at the end of a subscription only, and is a
26
 * wildcard for the level of hierarchy at which it is placed and all subsequent
27
 * levels.
28
 * The + wildcard may be used at any point within the subscription and is a
29
 * wildcard for only the level of hierarchy at which it is placed.
30
 * Neither wildcard may be used as part of a substring.
31
 * Valid:
32
 *  a/b/+
33
 *  a/+/c
34
 *  a/#
35
 *  a/b/#
36
 *  #
37
 *  +/b/c
38
 *  +/+/+
39
 * Invalid:
40
 *  a/#/c
41
 *  a+/b/c
42
 * Valid but non-matching:
43
 *  a/b
44
 *  a/+
45
 *  +/b
46
 *  b/c/a
47
 *  a/b/d
48
 */
49
50
#include "config.h"
51
52
#include <assert.h>
53
#include <stdio.h>
54
#include <string.h>
55
56
#include "mosquitto_broker_internal.h"
57
#include "mosquitto/mqtt_protocol.h"
58
#include "util_mosq.h"
59
60
#include "utlist.h"
61
62
static struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent, struct mosquitto__subhier **sibling, const char *topic, uint16_t len);
63
64
static unsigned int hashv_plus = 0;
65
static unsigned int hashv_hash = 0;
66
67
static int subs__send(struct mosquitto__subleaf *leaf, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored)
68
0
{
69
0
  bool client_retain;
70
0
  uint16_t mid;
71
0
  uint8_t client_qos, msg_qos;
72
0
  int rc2;
73
74
  /* Check for ACL topic access. */
75
0
  rc2 = mosquitto_acl_check(leaf->context, topic, stored->data.payloadlen, stored->data.payload, stored->data.qos, stored->data.retain, MOSQ_ACL_READ);
76
0
  if(rc2 == MOSQ_ERR_ACL_DENIED){
77
0
    return MOSQ_ERR_SUCCESS;
78
0
  }else if(rc2 == MOSQ_ERR_SUCCESS){
79
0
    client_qos = MQTT_SUB_OPT_GET_QOS(leaf->subscription_options);
80
81
0
    if(db.config->upgrade_outgoing_qos){
82
0
      msg_qos = client_qos;
83
0
    }else{
84
0
      if(qos > client_qos){
85
0
        msg_qos = client_qos;
86
0
      }else{
87
0
        msg_qos = qos;
88
0
      }
89
0
    }
90
0
    if(msg_qos){
91
0
      mid = mosquitto__mid_generate(leaf->context);
92
0
    }else{
93
0
      mid = 0;
94
0
    }
95
0
    if(MQTT_SUB_OPT_GET_RETAIN_AS_PUBLISHED(leaf->subscription_options)){
96
0
      client_retain = retain;
97
0
    }else{
98
0
      client_retain = false;
99
0
    }
100
0
    if(db__message_insert_outgoing(leaf->context, 0, mid, msg_qos, client_retain, stored, leaf->identifier, true, true) == 1){
101
0
      return 1;
102
0
    }
103
0
  }else{
104
0
    return 1; /* Application error */
105
0
  }
106
0
  return 0;
107
0
}
108
109
110
static int subs__shared_process(struct mosquitto__subhier *hier, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored)
111
0
{
112
0
  int rc = 0, rc2;
113
0
  struct mosquitto__subshared *shared, *shared_tmp;
114
0
  struct mosquitto__subleaf *leaf;
115
116
0
  HASH_ITER(hh, hier->shared, shared, shared_tmp){
117
0
    leaf = shared->subs;
118
0
    rc2 = subs__send(leaf, topic, qos, retain, stored);
119
    /* Remove current from the top, add back to the bottom */
120
0
    DL_DELETE(shared->subs, leaf);
121
0
    DL_APPEND(shared->subs, leaf);
122
123
0
    if(rc2) rc = 1;
124
0
  }
125
126
0
  return rc;
127
0
}
128
129
static int subs__process(struct mosquitto__subhier *hier, const char *source_id, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored)
130
0
{
131
0
  int rc = 0;
132
0
  int rc2;
133
0
  struct mosquitto__subleaf *leaf;
134
135
0
  rc = subs__shared_process(hier, topic, qos, retain, stored);
136
137
0
  leaf = hier->subs;
138
0
  while(source_id && leaf){
139
0
    if(!leaf->context->id || (MQTT_SUB_OPT_GET_NO_LOCAL(leaf->subscription_options) && !strcmp(leaf->context->id, source_id))){
140
0
      leaf = leaf->next;
141
0
      continue;
142
0
    }
143
0
    rc2 = subs__send(leaf, topic, qos, retain, stored);
144
0
    if(rc2){
145
0
      rc = 1;
146
0
    }
147
0
    leaf = leaf->next;
148
0
  }
149
0
  if(hier->subs || hier->shared){
150
0
    return rc;
151
0
  }else{
152
0
    return MOSQ_ERR_NO_SUBSCRIBERS;
153
0
  }
154
0
}
155
156
157
static int sub__add_leaf(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subleaf **head, struct mosquitto__subleaf **newleaf)
158
0
{
159
0
  struct mosquitto__subleaf *leaf;
160
161
0
  *newleaf = NULL;
162
0
  leaf = *head;
163
164
0
  while(leaf){
165
0
    if(leaf->context && leaf->context->id && !strcmp(leaf->context->id, context->id)){
166
      /* Client making a second subscription to same topic. Only
167
       * need to update QoS. Return MOSQ_ERR_SUB_EXISTS to
168
       * indicate this to the calling function. */
169
0
      leaf->identifier = sub->identifier;
170
0
      leaf->subscription_options = sub->options;
171
0
      return MOSQ_ERR_SUB_EXISTS;
172
0
    }
173
0
    leaf = leaf->next;
174
0
  }
175
0
  leaf = mosquitto_calloc(1, sizeof(struct mosquitto__subleaf) + strlen(sub->topic_filter) + 1);
176
0
  if(!leaf) return MOSQ_ERR_NOMEM;
177
0
  leaf->context = context;
178
0
  leaf->identifier = sub->identifier;
179
0
  leaf->subscription_options = sub->options;
180
0
  strcpy(leaf->topic_filter, sub->topic_filter);
181
182
0
  DL_APPEND(*head, leaf);
183
0
  *newleaf = leaf;
184
185
0
  return MOSQ_ERR_SUCCESS;
186
0
}
187
188
189
static void sub__remove_shared_leaf(struct mosquitto__subhier *subhier, struct mosquitto__subshared *shared, struct mosquitto__subleaf *leaf)
190
0
{
191
0
  DL_DELETE(shared->subs, leaf);
192
0
  if(shared->subs == NULL){
193
0
    HASH_DELETE(hh, subhier->shared, shared);
194
0
    mosquitto_FREE(shared);
195
0
  }
196
0
}
197
198
199
static int sub__add_shared(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subhier *subhier, const char *sharename)
200
0
{
201
0
  struct mosquitto__subleaf *newleaf;
202
0
  struct mosquitto__subshared *shared = NULL;
203
0
  struct mosquitto__subleaf **subs;
204
0
  size_t slen;
205
0
  int rc;
206
0
  unsigned hashv;
207
208
0
  slen = strlen(sharename);
209
210
0
  HASH_VALUE(sharename, slen, hashv);
211
212
0
  HASH_FIND_BYHASHVALUE(hh, subhier->shared, sharename, slen, hashv, shared);
213
0
  if(shared == NULL){
214
0
    shared = mosquitto_calloc(1, sizeof(struct mosquitto__subshared) + slen + 1);
215
0
    if(!shared){
216
0
      return MOSQ_ERR_NOMEM;
217
0
    }
218
0
    strncpy(shared->name, sharename, slen+1);
219
220
0
    HASH_ADD_BYHASHVALUE(hh, subhier->shared, name, slen, hashv, shared);
221
0
  }
222
223
0
  rc = sub__add_leaf(context, sub, &shared->subs, &newleaf);
224
0
  if(rc > 0){
225
0
    if(shared->subs == NULL){
226
0
      HASH_DELETE(hh, subhier->shared, shared);
227
0
      mosquitto_FREE(shared);
228
0
    }
229
0
    return rc;
230
0
  }
231
232
0
  if(rc != MOSQ_ERR_SUB_EXISTS){
233
0
    newleaf->hier = subhier;
234
0
    newleaf->shared = shared;
235
236
0
    bool assigned = false;
237
0
    for(int i=0; i<context->subs_capacity; i++){
238
0
      if(!context->subs[i]){
239
0
        context->subs[i] = newleaf;
240
0
        context->subs_count++;
241
0
        assigned = true;
242
0
        break;
243
0
      }
244
0
    }
245
0
    if(assigned == false){
246
0
      subs = mosquitto_realloc(context->subs, sizeof(struct mosquitto__subleaf *)*(size_t)(context->subs_capacity + 1));
247
0
      if(!subs){
248
0
        sub__remove_shared_leaf(subhier, shared, newleaf);
249
0
        mosquitto_FREE(newleaf);
250
0
        return MOSQ_ERR_NOMEM;
251
0
      }
252
0
      context->subs = subs;
253
0
      context->subs_capacity++;
254
0
      context->subs_count++;
255
0
      context->subs[context->subs_capacity-1] = newleaf;
256
0
    }
257
0
#ifdef WITH_SYS_TREE
258
0
    db.shared_subscription_count++;
259
0
#endif
260
0
  }
261
262
0
  if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt5){
263
0
    return rc;
264
0
  }else{
265
    /* mqttv311/mqttv5 requires retained messages are resent on
266
     * resubscribe. */
267
0
    return MOSQ_ERR_SUCCESS;
268
0
  }
269
0
}
270
271
272
static int sub__add_normal(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subhier *subhier)
273
0
{
274
0
  struct mosquitto__subleaf *newleaf = NULL;
275
0
  struct mosquitto__subleaf **subs;
276
0
  int rc;
277
278
0
  rc = sub__add_leaf(context, sub, &subhier->subs, &newleaf);
279
0
  if(rc > 0){
280
0
    return rc;
281
0
  }
282
283
0
  if(rc != MOSQ_ERR_SUB_EXISTS){
284
0
    newleaf->hier = subhier;
285
0
    newleaf->shared = NULL;
286
287
0
    bool assigned = false;
288
0
    for(int i=0; i<context->subs_capacity; i++){
289
0
      if(!context->subs[i]){
290
0
        context->subs[i] = newleaf;
291
0
        context->subs_count++;
292
0
        assigned = true;
293
0
        break;
294
0
      }
295
0
    }
296
0
    if(assigned == false){
297
0
      subs = mosquitto_realloc(context->subs, sizeof(struct mosquitto__subleaf *)*(size_t)(context->subs_capacity + 1));
298
0
      if(!subs){
299
0
        DL_DELETE(subhier->subs, newleaf);
300
0
        mosquitto_FREE(newleaf);
301
0
        return MOSQ_ERR_NOMEM;
302
0
      }
303
0
      context->subs = subs;
304
0
      context->subs_capacity++;
305
0
      context->subs_count++;
306
0
      context->subs[context->subs_capacity-1] = newleaf;
307
0
    }
308
0
#ifdef WITH_SYS_TREE
309
0
    db.subscription_count++;
310
0
#endif
311
0
  }
312
313
0
  if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt5){
314
0
    return rc;
315
0
  }else{
316
    /* mqttv311/mqttv5 requires retained messages are resent on
317
     * resubscribe. */
318
0
    return MOSQ_ERR_SUCCESS;
319
0
  }
320
0
}
321
322
323
static int sub__add_context(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subhier *subhier, char *const *const topics, const char *sharename)
324
0
{
325
0
  struct mosquitto__subhier *branch;
326
0
  int topic_index = 0;
327
0
  size_t topiclen;
328
329
  /* Find leaf node */
330
0
  while(topics && topics[topic_index] != NULL){
331
0
    topiclen = strlen(topics[topic_index]);
332
0
    if(topiclen > UINT16_MAX){
333
0
      return MOSQ_ERR_INVAL;
334
0
    }
335
0
    HASH_FIND(hh, subhier->children, topics[topic_index], topiclen, branch);
336
0
    if(!branch){
337
      /* Not found */
338
0
      branch = sub__add_hier_entry(subhier, &subhier->children, topics[topic_index], (uint16_t)topiclen);
339
0
      if(!branch) return MOSQ_ERR_NOMEM;
340
0
    }
341
0
    subhier = branch;
342
0
    topic_index++;
343
0
  }
344
345
  /* Add add our context */
346
0
  if(context && context->id){
347
0
    if(sharename){
348
0
      return sub__add_shared(context, sub, subhier, sharename);
349
0
    }else{
350
0
      return sub__add_normal(context, sub, subhier);
351
0
    }
352
0
  }else{
353
0
    return MOSQ_ERR_SUCCESS;
354
0
  }
355
0
}
356
357
358
static int sub__remove_normal(struct mosquitto *context, struct mosquitto__subhier *subhier, uint8_t *reason)
359
0
{
360
0
  struct mosquitto__subleaf *leaf;
361
362
0
  leaf = subhier->subs;
363
0
  while(leaf){
364
0
    if(leaf->context==context){
365
0
#ifdef WITH_SYS_TREE
366
0
      db.subscription_count--;
367
0
#endif
368
0
      DL_DELETE(subhier->subs, leaf);
369
370
      /* Remove the reference to the sub that the client is keeping.
371
       * It would be nice to be able to use the reference directly,
372
       * but that would involve keeping a copy of the topic string in
373
       * each subleaf. Might be worth considering though. */
374
0
      for(int i=0; i<context->subs_capacity; i++){
375
0
        if(context->subs[i] && context->subs[i]->hier == subhier){
376
0
          context->subs_count--;
377
0
          mosquitto_free(context->subs[i]);
378
0
          context->subs[i] = NULL;
379
0
          break;
380
0
        }
381
0
      }
382
0
      *reason = 0;
383
0
      return MOSQ_ERR_SUCCESS;
384
0
    }
385
0
    leaf = leaf->next;
386
0
  }
387
0
  return MOSQ_ERR_NO_SUBSCRIBERS;
388
0
}
389
390
391
static int sub__remove_shared(struct mosquitto *context, struct mosquitto__subhier *subhier, uint8_t *reason, const char *sharename)
392
0
{
393
0
  struct mosquitto__subshared *shared;
394
395
0
  HASH_FIND(hh, subhier->shared, sharename, strlen(sharename), shared);
396
0
  if(shared){
397
0
    struct mosquitto__subleaf *leaf = shared->subs;
398
0
    while(leaf){
399
0
      if(leaf->context==context){
400
0
#ifdef WITH_SYS_TREE
401
0
        db.shared_subscription_count--;
402
0
#endif
403
0
        DL_DELETE(shared->subs, leaf);
404
405
        /* Remove the reference to the sub that the client is keeping.
406
        * It would be nice to be able to use the reference directly,
407
        * but that would involve keeping a copy of the topic string in
408
        * each subleaf. Might be worth considering though. */
409
0
        for(int i=0; i<context->subs_capacity; i++){
410
0
          if(context->subs[i]
411
0
              && context->subs[i]->hier == subhier
412
0
              && context->subs[i]->shared == shared){
413
414
0
            mosquitto_free(context->subs[i]);
415
0
            context->subs[i] = NULL;
416
0
            context->subs_count--;
417
0
            break;
418
0
          }
419
0
        }
420
421
0
        if(shared->subs == NULL){
422
0
          HASH_DELETE(hh, subhier->shared, shared);
423
0
          mosquitto_FREE(shared);
424
0
        }
425
426
0
        *reason = 0;
427
0
        return MOSQ_ERR_SUCCESS;
428
0
      }
429
0
      leaf = leaf->next;
430
0
    }
431
0
    return MOSQ_ERR_NO_SUBSCRIBERS;
432
0
  }else{
433
0
    return MOSQ_ERR_NO_SUBSCRIBERS;
434
0
  }
435
0
}
436
437
438
static int sub__remove_recurse(struct mosquitto *context, struct mosquitto__subhier *subhier, char **topics, uint8_t *reason, const char *sharename)
439
0
{
440
0
  struct mosquitto__subhier *branch;
441
442
0
  if(topics == NULL || topics[0] == NULL){
443
0
    if(sharename){
444
0
      return sub__remove_shared(context, subhier, reason, sharename);
445
0
    }else{
446
0
      return sub__remove_normal(context, subhier, reason);
447
0
    }
448
0
  }
449
450
0
  HASH_FIND(hh, subhier->children, topics[0], strlen(topics[0]), branch);
451
0
  if(branch){
452
0
    sub__remove_recurse(context, branch, &(topics[1]), reason, sharename);
453
0
    if(!branch->children && !branch->subs && !branch->shared){
454
0
      HASH_DELETE(hh, subhier->children, branch);
455
0
      mosquitto_FREE(branch);
456
0
    }
457
0
  }
458
0
  return MOSQ_ERR_SUCCESS;
459
0
}
460
461
462
static int sub__search(struct mosquitto__subhier *subhier, char **split_topics, const char *source_id, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored)
463
0
{
464
  /* FIXME - need to take into account source_id if the client is a bridge */
465
0
  struct mosquitto__subhier *branch;
466
0
  int rc;
467
0
  bool have_subscribers = false;
468
469
0
  if(split_topics && split_topics[0]){
470
    /* Check for literal match */
471
0
    HASH_FIND(hh, subhier->children, split_topics[0], strlen(split_topics[0]), branch);
472
473
0
    if(branch){
474
0
      rc = sub__search(branch, &(split_topics[1]), source_id, topic, qos, retain, stored);
475
0
      if(rc == MOSQ_ERR_SUCCESS){
476
0
        have_subscribers = true;
477
0
      }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
478
0
        return rc;
479
0
      }
480
0
      if(split_topics[1] == NULL){ /* End of list */
481
0
        rc = subs__process(branch, source_id, topic, qos, retain, stored);
482
0
        if(rc == MOSQ_ERR_SUCCESS){
483
0
          have_subscribers = true;
484
0
        }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
485
0
          return rc;
486
0
        }
487
0
      }
488
0
    }
489
490
    /* Check for + match */
491
0
    HASH_FIND_BYHASHVALUE(hh, subhier->children, "+", 1, hashv_plus, branch);
492
493
0
    if(branch){
494
0
      rc = sub__search(branch, &(split_topics[1]), source_id, topic, qos, retain, stored);
495
0
      if(rc == MOSQ_ERR_SUCCESS){
496
0
        have_subscribers = true;
497
0
      }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
498
0
        return rc;
499
0
      }
500
0
      if(split_topics[1] == NULL){ /* End of list */
501
0
        rc = subs__process(branch, source_id, topic, qos, retain, stored);
502
0
        if(rc == MOSQ_ERR_SUCCESS){
503
0
          have_subscribers = true;
504
0
        }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
505
0
          return rc;
506
0
        }
507
0
      }
508
0
    }
509
0
  }
510
511
  /* Check for # match */
512
0
  HASH_FIND_BYHASHVALUE(hh, subhier->children, "#", 1, hashv_hash, branch);
513
0
  if(branch && !branch->children){
514
    /* The topic matches due to a # wildcard - process the
515
     * subscriptions but *don't* return. Although this branch has ended
516
     * there may still be other subscriptions to deal with.
517
     */
518
0
    rc = subs__process(branch, source_id, topic, qos, retain, stored);
519
0
    if(rc == MOSQ_ERR_SUCCESS){
520
0
      have_subscribers = true;
521
0
    }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
522
0
      return rc;
523
0
    }
524
0
  }
525
526
0
  if(have_subscribers){
527
0
    return MOSQ_ERR_SUCCESS;
528
0
  }else{
529
0
    return MOSQ_ERR_NO_SUBSCRIBERS;
530
0
  }
531
0
}
532
533
534
static struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent, struct mosquitto__subhier **sibling, const char *topic, uint16_t len)
535
0
{
536
0
  struct mosquitto__subhier *child;
537
538
0
  assert(sibling);
539
540
0
  child = mosquitto_calloc(1, sizeof(struct mosquitto__subhier) + len + 1);
541
0
  if(!child){
542
0
    log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
543
0
    return NULL;
544
0
  }
545
0
  child->parent = parent;
546
0
  child->topic_len = len;
547
0
  if(len > 0){
548
0
    strncpy(child->topic, topic, len);
549
0
  }
550
551
0
  HASH_ADD(hh, *sibling, topic, child->topic_len, child);
552
553
0
  return child;
554
0
}
555
556
557
int sub__add(struct mosquitto *context, const struct mosquitto_subscription *sub)
558
0
{
559
0
  int rc = 0;
560
0
  struct mosquitto__subhier *subhier;
561
0
  const char *sharename = NULL;
562
0
  char *local_sub;
563
0
  char **topics;
564
0
  size_t topiclen;
565
566
0
  assert(sub);
567
0
  assert(sub->topic_filter);
568
569
0
  rc = sub__topic_tokenise(sub->topic_filter, &local_sub, &topics, &sharename);
570
0
  if(rc) return rc;
571
572
0
  topiclen = strlen(topics[0]);
573
0
  if(topiclen > UINT16_MAX){
574
0
    mosquitto_FREE(local_sub);
575
0
    mosquitto_FREE(topics);
576
0
    return MOSQ_ERR_INVAL;
577
0
  }
578
579
0
  if(sharename){
580
0
    HASH_FIND(hh, db.shared_subs, topics[0], topiclen, subhier);
581
0
    if(!subhier){
582
0
      subhier = sub__add_hier_entry(NULL, &db.shared_subs, topics[0], (uint16_t)topiclen);
583
0
      if(!subhier){
584
0
        mosquitto_FREE(local_sub);
585
0
        mosquitto_FREE(topics);
586
0
        log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
587
0
        return MOSQ_ERR_NOMEM;
588
0
      }
589
0
    }
590
0
  }else{
591
0
    HASH_FIND(hh, db.normal_subs, topics[0], topiclen, subhier);
592
0
    if(!subhier){
593
0
      subhier = sub__add_hier_entry(NULL, &db.normal_subs, topics[0], (uint16_t)topiclen);
594
0
      if(!subhier){
595
0
        mosquitto_FREE(local_sub);
596
0
        mosquitto_FREE(topics);
597
0
        log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
598
0
        return MOSQ_ERR_NOMEM;
599
0
      }
600
0
    }
601
0
  }
602
0
  rc = sub__add_context(context, sub, subhier, topics, sharename);
603
604
0
  mosquitto_FREE(local_sub);
605
0
  mosquitto_FREE(topics);
606
607
0
  return rc;
608
0
}
609
610
int sub__remove(struct mosquitto *context, const char *sub, uint8_t *reason)
611
0
{
612
0
  int rc = 0;
613
0
  struct mosquitto__subhier *subhier;
614
0
  const char *sharename = NULL;
615
0
  char *local_sub = NULL;
616
0
  char **topics = NULL;
617
618
0
  assert(sub);
619
620
0
  rc = sub__topic_tokenise(sub, &local_sub, &topics, &sharename);
621
0
  if(rc) return rc;
622
623
0
  if(sharename){
624
0
    HASH_FIND(hh, db.shared_subs, topics[0], strlen(topics[0]), subhier);
625
0
  }else{
626
0
    HASH_FIND(hh, db.normal_subs, topics[0], strlen(topics[0]), subhier);
627
0
  }
628
0
  if(subhier){
629
0
    *reason = MQTT_RC_NO_SUBSCRIPTION_EXISTED;
630
0
    rc = sub__remove_recurse(context, subhier, topics, reason, sharename);
631
0
  }
632
633
0
  mosquitto_FREE(local_sub);
634
0
  mosquitto_FREE(topics);
635
636
0
  return rc;
637
0
}
638
639
int sub__messages_queue(const char *source_id, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg **stored)
640
0
{
641
0
  int rc = MOSQ_ERR_SUCCESS, rc2;
642
0
  int rc_normal = MOSQ_ERR_NO_SUBSCRIBERS, rc_shared = MOSQ_ERR_NO_SUBSCRIBERS;
643
0
  struct mosquitto__subhier *subhier;
644
0
  char **split_topics = NULL;
645
0
  char *local_topic = NULL;
646
0
  unsigned hashv;
647
0
  size_t topiclen;
648
649
0
  assert(topic);
650
651
0
  if(sub__topic_tokenise(topic, &local_topic, &split_topics, NULL)) return 1;
652
653
  /* Protect this message until we have sent it to all
654
  clients - this is required because websockets client calls
655
  db__message_write(), which could remove the message if ref_count==0.
656
  */
657
0
  db__msg_store_ref_inc(*stored);
658
659
0
  topiclen = strlen(split_topics[0]);
660
0
  HASH_VALUE(split_topics[0], topiclen, hashv);
661
0
  HASH_FIND_BYHASHVALUE(hh, db.normal_subs, split_topics[0], topiclen, hashv, subhier);
662
0
  if(subhier){
663
0
    rc_normal = sub__search(subhier, split_topics, source_id, topic, qos, retain, *stored);
664
0
    if(rc_normal > 0){
665
0
      rc = rc_normal;
666
0
      goto end;
667
0
    }
668
0
  }
669
670
0
  HASH_FIND_BYHASHVALUE(hh, db.shared_subs, split_topics[0], topiclen, hashv, subhier);
671
0
  if(subhier){
672
0
    rc_shared = sub__search(subhier, split_topics, source_id, topic, qos, retain, *stored);
673
0
    if(rc_shared > 0){
674
0
      rc = rc_shared;
675
0
      goto end;
676
0
    }
677
0
  }
678
679
0
  if(rc_normal == MOSQ_ERR_NO_SUBSCRIBERS && rc_shared == MOSQ_ERR_NO_SUBSCRIBERS){
680
0
    rc = MOSQ_ERR_NO_SUBSCRIBERS;
681
0
  }
682
683
0
  if(retain){
684
0
    rc2 = retain__store(topic, *stored, split_topics, true);
685
0
    if(rc2) rc = rc2;
686
0
  }
687
688
0
end:
689
0
  mosquitto_FREE(split_topics);
690
0
  mosquitto_FREE(local_topic);
691
  /* Remove our reference and free if needed. */
692
0
  db__msg_store_ref_dec(stored);
693
694
0
  return rc;
695
0
}
696
697
698
/* Remove a subhier element, and return its parent if that needs freeing as well. */
699
static struct mosquitto__subhier *tmp_remove_subs(struct mosquitto__subhier *sub)
700
0
{
701
0
  struct mosquitto__subhier *parent;
702
703
0
  if(!sub || !sub->parent){
704
0
    return NULL;
705
0
  }
706
707
0
  if(sub->children || sub->subs){
708
0
    return NULL;
709
0
  }
710
711
0
  parent = sub->parent;
712
0
  HASH_DELETE(hh, parent->children, sub);
713
0
  mosquitto_FREE(sub);
714
715
0
  if(parent->subs == NULL
716
0
      && parent->children == NULL
717
0
      && parent->shared == NULL
718
0
      && parent->parent){
719
720
0
    return parent;
721
0
  }else{
722
0
    return NULL;
723
0
  }
724
0
}
725
726
727
/* Remove all subscriptions for a client.
728
 */
729
int sub__clean_session(struct mosquitto *context)
730
1.55k
{
731
1.55k
  for(int i=0; i<context->subs_capacity; i++){
732
0
    if(context->subs[i] == NULL || context->subs[i]->hier == NULL){
733
0
      continue;
734
0
    }
735
736
0
    struct mosquitto__subhier *hier = context->subs[i]->hier;
737
0
    struct mosquitto__subleaf *leaf;
738
739
0
    plugin_persist__handle_subscription_delete(context, context->subs[i]->topic_filter);
740
0
    if(context->subs[i]->shared){
741
0
      leaf = context->subs[i]->shared->subs;
742
0
      while(leaf){
743
0
        if(leaf->context==context){
744
0
#ifdef WITH_SYS_TREE
745
0
          db.shared_subscription_count--;
746
0
#endif
747
0
          sub__remove_shared_leaf(context->subs[i]->hier, context->subs[i]->shared, leaf);
748
0
          break;
749
0
        }
750
0
        leaf = leaf->next;
751
0
      }
752
0
    }else{
753
0
      leaf = hier->subs;
754
0
      while(leaf){
755
0
        if(leaf->context==context){
756
0
#ifdef WITH_SYS_TREE
757
0
          db.subscription_count--;
758
0
#endif
759
0
          DL_DELETE(hier->subs, leaf);
760
0
          break;
761
0
        }
762
0
        leaf = leaf->next;
763
0
      }
764
0
    }
765
0
    mosquitto_FREE(context->subs[i]);
766
767
0
    if(hier->subs == NULL
768
0
        && hier->children == NULL
769
0
        && hier->shared == NULL
770
0
        && hier->parent){
771
772
0
      do{
773
0
        hier = tmp_remove_subs(hier);
774
0
      }while(hier);
775
0
    }
776
0
  }
777
1.55k
  mosquitto_FREE(context->subs);
778
1.55k
  context->subs_capacity = 0;
779
1.55k
  context->subs_count = 0;
780
781
1.55k
  return MOSQ_ERR_SUCCESS;
782
1.55k
}
783
784
void sub__tree_print(struct mosquitto__subhier *root, int level)
785
0
{
786
0
  int i;
787
0
  struct mosquitto__subhier *branch, *branch_tmp;
788
0
  struct mosquitto__subleaf *leaf;
789
790
0
  HASH_ITER(hh, root, branch, branch_tmp){
791
0
    if(level > -1){
792
0
      for(i=0; i<(level+2)*2; i++){
793
0
        printf(" ");
794
0
      }
795
0
      printf("%s", branch->topic);
796
0
      leaf = branch->subs;
797
0
      while(leaf){
798
0
        if(leaf->context){
799
0
          printf(" (%s, %d)", leaf->context->id, MQTT_SUB_OPT_GET_QOS(leaf->subscription_options));
800
0
        }else{
801
0
          printf(" (%s, %d)", "", MQTT_SUB_OPT_GET_QOS(leaf->subscription_options));
802
0
        }
803
0
        leaf = leaf->next;
804
0
      }
805
0
      printf("\n");
806
0
    }
807
808
0
    sub__tree_print(branch->children, level+1);
809
0
  }
810
0
}
811
812
int sub__init(void)
813
0
{
814
0
  HASH_VALUE("+", 1, hashv_plus);
815
0
  HASH_VALUE("#", 1, hashv_hash);
816
817
0
  if(sub__add_hier_entry(NULL, &db.shared_subs, "", 0) == NULL
818
0
      || sub__add_hier_entry(NULL, &db.normal_subs, "", 0) == NULL
819
0
      || sub__add_hier_entry(NULL, &db.normal_subs, "$SYS", (uint16_t)strlen("$SYS")) == NULL
820
0
      ){
821
822
0
    return MOSQ_ERR_NOMEM;
823
0
  }else{
824
0
    return MOSQ_ERR_SUCCESS;
825
0
  }
826
0
}