Coverage Report

Created: 2025-11-14 07:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/mosquitto/src/subs.c
Line
Count
Source
1
/*
2
Copyright (c) 2010-2021 Roger Light <roger@atchoo.org>
3
4
All rights reserved. This program and the accompanying materials
5
are made available under the terms of the Eclipse Public License 2.0
6
and Eclipse Distribution License v1.0 which accompany this distribution.
7
8
The Eclipse Public License is available at
9
   https://www.eclipse.org/legal/epl-2.0/
10
and the Eclipse Distribution License is available at
11
  http://www.eclipse.org/org/documents/edl-v10.php.
12
13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15
Contributors:
16
   Roger Light - initial implementation and documentation.
17
*/
18
19
/* A note on matching topic subscriptions.
20
 *
21
 * Topics can be up to 32767 characters in length. The / character is used as a
22
 * hierarchy delimiter. Messages are published to a particular topic.
23
 * Clients may subscribe to particular topics directly, but may also use
24
 * wildcards in subscriptions.  The + and # characters are used as wildcards.
25
 * The # wildcard can be used at the end of a subscription only, and is a
26
 * wildcard for the level of hierarchy at which it is placed and all subsequent
27
 * levels.
28
 * The + wildcard may be used at any point within the subscription and is a
29
 * wildcard for only the level of hierarchy at which it is placed.
30
 * Neither wildcard may be used as part of a substring.
31
 * Valid:
32
 *  a/b/+
33
 *  a/+/c
34
 *  a/#
35
 *  a/b/#
36
 *  #
37
 *  +/b/c
38
 *  +/+/+
39
 * Invalid:
40
 *  a/#/c
41
 *  a+/b/c
42
 * Valid but non-matching:
43
 *  a/b
44
 *  a/+
45
 *  +/b
46
 *  b/c/a
47
 *  a/b/d
48
 */
49
50
#include "config.h"
51
52
#include <assert.h>
53
#include <stdio.h>
54
#include <string.h>
55
56
#include "mosquitto_broker_internal.h"
57
#include "mosquitto/mqtt_protocol.h"
58
#include "util_mosq.h"
59
60
#include "utlist.h"
61
62
static struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent, struct mosquitto__subhier **sibling, const char *topic, uint16_t len);
63
64
static unsigned int hashv_plus = 0;
65
static unsigned int hashv_hash = 0;
66
67
68
static int subs__send(struct mosquitto__subleaf *leaf, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored)
69
0
{
70
0
  bool client_retain;
71
0
  uint16_t mid;
72
0
  uint8_t client_qos, msg_qos;
73
0
  int rc2;
74
75
  /* Check for ACL topic access. */
76
0
  rc2 = mosquitto_acl_check(leaf->context, topic, stored->data.payloadlen, stored->data.payload, stored->data.qos, stored->data.retain, MOSQ_ACL_READ);
77
0
  if(rc2 == MOSQ_ERR_ACL_DENIED){
78
0
    return MOSQ_ERR_SUCCESS;
79
0
  }else if(rc2 == MOSQ_ERR_SUCCESS){
80
0
    client_qos = MQTT_SUB_OPT_GET_QOS(leaf->subscription_options);
81
82
0
    if(db.config->upgrade_outgoing_qos){
83
0
      msg_qos = client_qos;
84
0
    }else{
85
0
      if(qos > client_qos){
86
0
        msg_qos = client_qos;
87
0
      }else{
88
0
        msg_qos = qos;
89
0
      }
90
0
    }
91
0
    if(msg_qos){
92
0
      mid = mosquitto__mid_generate(leaf->context);
93
0
    }else{
94
0
      mid = 0;
95
0
    }
96
0
    if(MQTT_SUB_OPT_GET_RETAIN_AS_PUBLISHED(leaf->subscription_options)){
97
0
      client_retain = retain;
98
0
    }else{
99
0
      client_retain = false;
100
0
    }
101
0
    if(db__message_insert_outgoing(leaf->context, 0, mid, msg_qos, client_retain, stored, leaf->identifier, true, true) == 1){
102
0
      return 1;
103
0
    }
104
0
  }else{
105
0
    return 1; /* Application error */
106
0
  }
107
0
  return 0;
108
0
}
109
110
111
static int subs__shared_process(struct mosquitto__subhier *hier, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored)
112
0
{
113
0
  int rc = 0, rc2;
114
0
  struct mosquitto__subshared *shared, *shared_tmp;
115
0
  struct mosquitto__subleaf *leaf;
116
117
0
  HASH_ITER(hh, hier->shared, shared, shared_tmp){
118
0
    leaf = shared->subs;
119
0
    rc2 = subs__send(leaf, topic, qos, retain, stored);
120
    /* Remove current from the top, add back to the bottom */
121
0
    DL_DELETE(shared->subs, leaf);
122
0
    DL_APPEND(shared->subs, leaf);
123
124
0
    if(rc2){
125
0
      rc = 1;
126
0
    }
127
0
  }
128
129
0
  return rc;
130
0
}
131
132
133
static int subs__process(struct mosquitto__subhier *hier, const char *source_id, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored)
134
0
{
135
0
  int rc = 0;
136
0
  int rc2;
137
0
  struct mosquitto__subleaf *leaf;
138
139
0
  rc = subs__shared_process(hier, topic, qos, retain, stored);
140
141
0
  leaf = hier->subs;
142
0
  while(source_id && leaf){
143
0
    if(!leaf->context->id || (MQTT_SUB_OPT_GET_NO_LOCAL(leaf->subscription_options) && !strcmp(leaf->context->id, source_id))){
144
0
      leaf = leaf->next;
145
0
      continue;
146
0
    }
147
0
    rc2 = subs__send(leaf, topic, qos, retain, stored);
148
0
    if(rc2){
149
0
      rc = 1;
150
0
    }
151
0
    leaf = leaf->next;
152
0
  }
153
0
  if(hier->subs || hier->shared){
154
0
    return rc;
155
0
  }else{
156
0
    return MOSQ_ERR_NO_SUBSCRIBERS;
157
0
  }
158
0
}
159
160
161
static int sub__add_leaf(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subleaf **head, struct mosquitto__subleaf **newleaf)
162
0
{
163
0
  struct mosquitto__subleaf *leaf;
164
165
0
  *newleaf = NULL;
166
0
  leaf = *head;
167
168
0
  while(leaf){
169
0
    if(leaf->context && leaf->context->id && !strcmp(leaf->context->id, context->id)){
170
      /* Client making a second subscription to same topic. Only
171
       * need to update QoS. Return MOSQ_ERR_SUB_EXISTS to
172
       * indicate this to the calling function. */
173
0
      leaf->identifier = sub->identifier;
174
0
      leaf->subscription_options = sub->options;
175
0
      return MOSQ_ERR_SUB_EXISTS;
176
0
    }
177
0
    leaf = leaf->next;
178
0
  }
179
0
  leaf = mosquitto_calloc(1, sizeof(struct mosquitto__subleaf) + strlen(sub->topic_filter) + 1);
180
0
  if(!leaf){
181
0
    return MOSQ_ERR_NOMEM;
182
0
  }
183
0
  leaf->context = context;
184
0
  leaf->identifier = sub->identifier;
185
0
  leaf->subscription_options = sub->options;
186
0
  strcpy(leaf->topic_filter, sub->topic_filter);
187
188
0
  DL_APPEND(*head, leaf);
189
0
  *newleaf = leaf;
190
191
0
  return MOSQ_ERR_SUCCESS;
192
0
}
193
194
195
static void sub__remove_shared_leaf(struct mosquitto__subhier *subhier, struct mosquitto__subshared *shared, struct mosquitto__subleaf *leaf)
196
0
{
197
0
  DL_DELETE(shared->subs, leaf);
198
0
  if(shared->subs == NULL){
199
0
    HASH_DELETE(hh, subhier->shared, shared);
200
0
    mosquitto_FREE(shared);
201
0
  }
202
0
}
203
204
205
static int sub__add_shared(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subhier *subhier, const char *sharename)
206
0
{
207
0
  struct mosquitto__subleaf *newleaf;
208
0
  struct mosquitto__subshared *shared = NULL;
209
0
  struct mosquitto__subleaf **subs;
210
0
  size_t slen;
211
0
  int rc;
212
0
  unsigned hashv;
213
214
0
  slen = strlen(sharename);
215
216
0
  HASH_VALUE(sharename, slen, hashv);
217
218
0
  HASH_FIND_BYHASHVALUE(hh, subhier->shared, sharename, slen, hashv, shared);
219
0
  if(shared == NULL){
220
0
    shared = mosquitto_calloc(1, sizeof(struct mosquitto__subshared) + slen + 1);
221
0
    if(!shared){
222
0
      return MOSQ_ERR_NOMEM;
223
0
    }
224
0
    strncpy(shared->name, sharename, slen+1);
225
226
0
    HASH_ADD_BYHASHVALUE(hh, subhier->shared, name, slen, hashv, shared);
227
0
  }
228
229
0
  rc = sub__add_leaf(context, sub, &shared->subs, &newleaf);
230
0
  if(rc > 0){
231
0
    if(shared->subs == NULL){
232
0
      HASH_DELETE(hh, subhier->shared, shared);
233
0
      mosquitto_FREE(shared);
234
0
    }
235
0
    return rc;
236
0
  }
237
238
0
  if(rc != MOSQ_ERR_SUB_EXISTS){
239
0
    newleaf->hier = subhier;
240
0
    newleaf->shared = shared;
241
242
0
    bool assigned = false;
243
0
    for(int i=0; i<context->subs_capacity; i++){
244
0
      if(!context->subs[i]){
245
0
        context->subs[i] = newleaf;
246
0
        context->subs_count++;
247
0
        assigned = true;
248
0
        break;
249
0
      }
250
0
    }
251
0
    if(assigned == false){
252
0
      subs = mosquitto_realloc(context->subs, sizeof(struct mosquitto__subleaf *)*(size_t)(context->subs_capacity + 1));
253
0
      if(!subs){
254
0
        sub__remove_shared_leaf(subhier, shared, newleaf);
255
0
        mosquitto_FREE(newleaf);
256
0
        return MOSQ_ERR_NOMEM;
257
0
      }
258
0
      context->subs = subs;
259
0
      context->subs_capacity++;
260
0
      context->subs_count++;
261
0
      context->subs[context->subs_capacity-1] = newleaf;
262
0
    }
263
0
#ifdef WITH_SYS_TREE
264
0
    db.shared_subscription_count++;
265
0
#endif
266
0
  }
267
268
0
  if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt5){
269
0
    return rc;
270
0
  }else{
271
    /* mqttv311/mqttv5 requires retained messages are resent on
272
     * resubscribe. */
273
0
    return MOSQ_ERR_SUCCESS;
274
0
  }
275
0
}
276
277
278
static int sub__add_normal(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subhier *subhier)
279
0
{
280
0
  struct mosquitto__subleaf *newleaf = NULL;
281
0
  struct mosquitto__subleaf **subs;
282
0
  int rc;
283
284
0
  rc = sub__add_leaf(context, sub, &subhier->subs, &newleaf);
285
0
  if(rc > 0){
286
0
    return rc;
287
0
  }
288
289
0
  if(rc != MOSQ_ERR_SUB_EXISTS){
290
0
    newleaf->hier = subhier;
291
0
    newleaf->shared = NULL;
292
293
0
    bool assigned = false;
294
0
    for(int i=0; i<context->subs_capacity; i++){
295
0
      if(!context->subs[i]){
296
0
        context->subs[i] = newleaf;
297
0
        context->subs_count++;
298
0
        assigned = true;
299
0
        break;
300
0
      }
301
0
    }
302
0
    if(assigned == false){
303
0
      subs = mosquitto_realloc(context->subs, sizeof(struct mosquitto__subleaf *)*(size_t)(context->subs_capacity + 1));
304
0
      if(!subs){
305
0
        DL_DELETE(subhier->subs, newleaf);
306
0
        mosquitto_FREE(newleaf);
307
0
        return MOSQ_ERR_NOMEM;
308
0
      }
309
0
      context->subs = subs;
310
0
      context->subs_capacity++;
311
0
      context->subs_count++;
312
0
      context->subs[context->subs_capacity-1] = newleaf;
313
0
    }
314
0
#ifdef WITH_SYS_TREE
315
0
    db.subscription_count++;
316
0
#endif
317
0
  }
318
319
0
  if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt5){
320
0
    return rc;
321
0
  }else{
322
    /* mqttv311/mqttv5 requires retained messages are resent on
323
     * resubscribe. */
324
0
    return MOSQ_ERR_SUCCESS;
325
0
  }
326
0
}
327
328
329
static int sub__add_context(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subhier *subhier, char *const *const topics, const char *sharename)
330
0
{
331
0
  struct mosquitto__subhier *branch;
332
0
  int topic_index = 0;
333
0
  size_t topiclen;
334
335
  /* Find leaf node */
336
0
  while(topics && topics[topic_index] != NULL){
337
0
    topiclen = strlen(topics[topic_index]);
338
0
    if(topiclen > UINT16_MAX){
339
0
      return MOSQ_ERR_INVAL;
340
0
    }
341
0
    HASH_FIND(hh, subhier->children, topics[topic_index], topiclen, branch);
342
0
    if(!branch){
343
      /* Not found */
344
0
      branch = sub__add_hier_entry(subhier, &subhier->children, topics[topic_index], (uint16_t)topiclen);
345
0
      if(!branch){
346
0
        return MOSQ_ERR_NOMEM;
347
0
      }
348
0
    }
349
0
    subhier = branch;
350
0
    topic_index++;
351
0
  }
352
353
  /* Add add our context */
354
0
  if(context && context->id){
355
0
    if(sharename){
356
0
      return sub__add_shared(context, sub, subhier, sharename);
357
0
    }else{
358
0
      return sub__add_normal(context, sub, subhier);
359
0
    }
360
0
  }else{
361
0
    return MOSQ_ERR_SUCCESS;
362
0
  }
363
0
}
364
365
366
static int sub__remove_normal(struct mosquitto *context, struct mosquitto__subhier *subhier, uint8_t *reason)
367
0
{
368
0
  struct mosquitto__subleaf *leaf;
369
370
0
  leaf = subhier->subs;
371
0
  while(leaf){
372
0
    if(leaf->context==context){
373
0
#ifdef WITH_SYS_TREE
374
0
      db.subscription_count--;
375
0
#endif
376
0
      DL_DELETE(subhier->subs, leaf);
377
378
      /* Remove the reference to the sub that the client is keeping.
379
       * It would be nice to be able to use the reference directly,
380
       * but that would involve keeping a copy of the topic string in
381
       * each subleaf. Might be worth considering though. */
382
0
      for(int i=0; i<context->subs_capacity; i++){
383
0
        if(context->subs[i] && context->subs[i]->hier == subhier){
384
0
          context->subs_count--;
385
0
          mosquitto_free(context->subs[i]);
386
0
          context->subs[i] = NULL;
387
0
          break;
388
0
        }
389
0
      }
390
0
      *reason = 0;
391
0
      return MOSQ_ERR_SUCCESS;
392
0
    }
393
0
    leaf = leaf->next;
394
0
  }
395
0
  return MOSQ_ERR_NO_SUBSCRIBERS;
396
0
}
397
398
399
static int sub__remove_shared(struct mosquitto *context, struct mosquitto__subhier *subhier, uint8_t *reason, const char *sharename)
400
0
{
401
0
  struct mosquitto__subshared *shared;
402
403
0
  HASH_FIND(hh, subhier->shared, sharename, strlen(sharename), shared);
404
0
  if(shared){
405
0
    struct mosquitto__subleaf *leaf = shared->subs;
406
0
    while(leaf){
407
0
      if(leaf->context==context){
408
0
#ifdef WITH_SYS_TREE
409
0
        db.shared_subscription_count--;
410
0
#endif
411
0
        DL_DELETE(shared->subs, leaf);
412
413
        /* Remove the reference to the sub that the client is keeping.
414
        * It would be nice to be able to use the reference directly,
415
        * but that would involve keeping a copy of the topic string in
416
        * each subleaf. Might be worth considering though. */
417
0
        for(int i=0; i<context->subs_capacity; i++){
418
0
          if(context->subs[i]
419
0
              && context->subs[i]->hier == subhier
420
0
              && context->subs[i]->shared == shared){
421
422
0
            mosquitto_free(context->subs[i]);
423
0
            context->subs[i] = NULL;
424
0
            context->subs_count--;
425
0
            break;
426
0
          }
427
0
        }
428
429
0
        if(shared->subs == NULL){
430
0
          HASH_DELETE(hh, subhier->shared, shared);
431
0
          mosquitto_FREE(shared);
432
0
        }
433
434
0
        *reason = 0;
435
0
        return MOSQ_ERR_SUCCESS;
436
0
      }
437
0
      leaf = leaf->next;
438
0
    }
439
0
    return MOSQ_ERR_NO_SUBSCRIBERS;
440
0
  }else{
441
0
    return MOSQ_ERR_NO_SUBSCRIBERS;
442
0
  }
443
0
}
444
445
446
static int sub__remove_recurse(struct mosquitto *context, struct mosquitto__subhier *subhier, char **topics, uint8_t *reason, const char *sharename)
447
0
{
448
0
  struct mosquitto__subhier *branch;
449
450
0
  if(topics == NULL || topics[0] == NULL){
451
0
    if(sharename){
452
0
      return sub__remove_shared(context, subhier, reason, sharename);
453
0
    }else{
454
0
      return sub__remove_normal(context, subhier, reason);
455
0
    }
456
0
  }
457
458
0
  HASH_FIND(hh, subhier->children, topics[0], strlen(topics[0]), branch);
459
0
  if(branch){
460
0
    sub__remove_recurse(context, branch, &(topics[1]), reason, sharename);
461
0
    if(!branch->children && !branch->subs && !branch->shared){
462
0
      HASH_DELETE(hh, subhier->children, branch);
463
0
      mosquitto_FREE(branch);
464
0
    }
465
0
  }
466
0
  return MOSQ_ERR_SUCCESS;
467
0
}
468
469
470
static int sub__search(struct mosquitto__subhier *subhier, char **split_topics, const char *source_id, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored)
471
0
{
472
  /* FIXME - need to take into account source_id if the client is a bridge */
473
0
  struct mosquitto__subhier *branch;
474
0
  int rc;
475
0
  bool have_subscribers = false;
476
477
0
  if(split_topics && split_topics[0]){
478
    /* Check for literal match */
479
0
    HASH_FIND(hh, subhier->children, split_topics[0], strlen(split_topics[0]), branch);
480
481
0
    if(branch){
482
0
      rc = sub__search(branch, &(split_topics[1]), source_id, topic, qos, retain, stored);
483
0
      if(rc == MOSQ_ERR_SUCCESS){
484
0
        have_subscribers = true;
485
0
      }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
486
0
        return rc;
487
0
      }
488
0
      if(split_topics[1] == NULL){ /* End of list */
489
0
        rc = subs__process(branch, source_id, topic, qos, retain, stored);
490
0
        if(rc == MOSQ_ERR_SUCCESS){
491
0
          have_subscribers = true;
492
0
        }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
493
0
          return rc;
494
0
        }
495
0
      }
496
0
    }
497
498
    /* Check for + match */
499
0
    HASH_FIND_BYHASHVALUE(hh, subhier->children, "+", 1, hashv_plus, branch);
500
501
0
    if(branch){
502
0
      rc = sub__search(branch, &(split_topics[1]), source_id, topic, qos, retain, stored);
503
0
      if(rc == MOSQ_ERR_SUCCESS){
504
0
        have_subscribers = true;
505
0
      }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
506
0
        return rc;
507
0
      }
508
0
      if(split_topics[1] == NULL){ /* End of list */
509
0
        rc = subs__process(branch, source_id, topic, qos, retain, stored);
510
0
        if(rc == MOSQ_ERR_SUCCESS){
511
0
          have_subscribers = true;
512
0
        }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
513
0
          return rc;
514
0
        }
515
0
      }
516
0
    }
517
0
  }
518
519
  /* Check for # match */
520
0
  HASH_FIND_BYHASHVALUE(hh, subhier->children, "#", 1, hashv_hash, branch);
521
0
  if(branch && !branch->children){
522
    /* The topic matches due to a # wildcard - process the
523
     * subscriptions but *don't* return. Although this branch has ended
524
     * there may still be other subscriptions to deal with.
525
     */
526
0
    rc = subs__process(branch, source_id, topic, qos, retain, stored);
527
0
    if(rc == MOSQ_ERR_SUCCESS){
528
0
      have_subscribers = true;
529
0
    }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
530
0
      return rc;
531
0
    }
532
0
  }
533
534
0
  if(have_subscribers){
535
0
    return MOSQ_ERR_SUCCESS;
536
0
  }else{
537
0
    return MOSQ_ERR_NO_SUBSCRIBERS;
538
0
  }
539
0
}
540
541
542
static struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent, struct mosquitto__subhier **sibling, const char *topic, uint16_t len)
543
0
{
544
0
  struct mosquitto__subhier *child;
545
546
0
  assert(sibling);
547
548
0
  child = mosquitto_calloc(1, sizeof(struct mosquitto__subhier) + len + 1);
549
0
  if(!child){
550
0
    log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
551
0
    return NULL;
552
0
  }
553
0
  child->parent = parent;
554
0
  child->topic_len = len;
555
0
  if(len > 0){
556
0
    strncpy(child->topic, topic, len);
557
0
  }
558
559
0
  HASH_ADD(hh, *sibling, topic, child->topic_len, child);
560
561
0
  return child;
562
0
}
563
564
565
int sub__add(struct mosquitto *context, const struct mosquitto_subscription *sub)
566
0
{
567
0
  int rc = 0;
568
0
  struct mosquitto__subhier *subhier;
569
0
  const char *sharename = NULL;
570
0
  char *local_sub;
571
0
  char **topics;
572
0
  size_t topiclen;
573
574
0
  assert(sub);
575
0
  assert(sub->topic_filter);
576
577
0
  rc = sub__topic_tokenise(sub->topic_filter, &local_sub, &topics, &sharename);
578
0
  if(rc){
579
0
    return rc;
580
0
  }
581
582
0
  topiclen = strlen(topics[0]);
583
0
  if(topiclen > UINT16_MAX){
584
0
    mosquitto_FREE(local_sub);
585
0
    mosquitto_FREE(topics);
586
0
    return MOSQ_ERR_INVAL;
587
0
  }
588
589
0
  if(sharename){
590
0
    HASH_FIND(hh, db.shared_subs, topics[0], topiclen, subhier);
591
0
    if(!subhier){
592
0
      subhier = sub__add_hier_entry(NULL, &db.shared_subs, topics[0], (uint16_t)topiclen);
593
0
      if(!subhier){
594
0
        mosquitto_FREE(local_sub);
595
0
        mosquitto_FREE(topics);
596
0
        log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
597
0
        return MOSQ_ERR_NOMEM;
598
0
      }
599
0
    }
600
0
  }else{
601
0
    HASH_FIND(hh, db.normal_subs, topics[0], topiclen, subhier);
602
0
    if(!subhier){
603
0
      subhier = sub__add_hier_entry(NULL, &db.normal_subs, topics[0], (uint16_t)topiclen);
604
0
      if(!subhier){
605
0
        mosquitto_FREE(local_sub);
606
0
        mosquitto_FREE(topics);
607
0
        log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
608
0
        return MOSQ_ERR_NOMEM;
609
0
      }
610
0
    }
611
0
  }
612
0
  rc = sub__add_context(context, sub, subhier, topics, sharename);
613
614
0
  mosquitto_FREE(local_sub);
615
0
  mosquitto_FREE(topics);
616
617
0
  return rc;
618
0
}
619
620
621
int sub__remove(struct mosquitto *context, const char *sub, uint8_t *reason)
622
0
{
623
0
  int rc = 0;
624
0
  struct mosquitto__subhier *subhier;
625
0
  const char *sharename = NULL;
626
0
  char *local_sub = NULL;
627
0
  char **topics = NULL;
628
629
0
  assert(sub);
630
631
0
  rc = sub__topic_tokenise(sub, &local_sub, &topics, &sharename);
632
0
  if(rc){
633
0
    return rc;
634
0
  }
635
636
0
  if(sharename){
637
0
    HASH_FIND(hh, db.shared_subs, topics[0], strlen(topics[0]), subhier);
638
0
  }else{
639
0
    HASH_FIND(hh, db.normal_subs, topics[0], strlen(topics[0]), subhier);
640
0
  }
641
0
  if(subhier){
642
0
    *reason = MQTT_RC_NO_SUBSCRIPTION_EXISTED;
643
0
    rc = sub__remove_recurse(context, subhier, topics, reason, sharename);
644
0
  }
645
646
0
  mosquitto_FREE(local_sub);
647
0
  mosquitto_FREE(topics);
648
649
0
  return rc;
650
0
}
651
652
653
int sub__messages_queue(const char *source_id, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg **stored)
654
0
{
655
0
  int rc = MOSQ_ERR_SUCCESS, rc2;
656
0
  int rc_normal = MOSQ_ERR_NO_SUBSCRIBERS, rc_shared = MOSQ_ERR_NO_SUBSCRIBERS;
657
0
  struct mosquitto__subhier *subhier;
658
0
  char **split_topics = NULL;
659
0
  char *local_topic = NULL;
660
0
  unsigned hashv;
661
0
  size_t topiclen;
662
663
0
  assert(topic);
664
665
0
  if(sub__topic_tokenise(topic, &local_topic, &split_topics, NULL)){
666
0
    return 1;
667
0
  }
668
669
  /* Protect this message until we have sent it to all
670
  clients - this is required because websockets client calls
671
  db__message_write(), which could remove the message if ref_count==0.
672
  */
673
0
  db__msg_store_ref_inc(*stored);
674
675
0
  topiclen = strlen(split_topics[0]);
676
0
  HASH_VALUE(split_topics[0], topiclen, hashv);
677
0
  HASH_FIND_BYHASHVALUE(hh, db.normal_subs, split_topics[0], topiclen, hashv, subhier);
678
0
  if(subhier){
679
0
    rc_normal = sub__search(subhier, split_topics, source_id, topic, qos, retain, *stored);
680
0
    if(rc_normal > 0){
681
0
      rc = rc_normal;
682
0
      goto end;
683
0
    }
684
0
  }
685
686
0
  HASH_FIND_BYHASHVALUE(hh, db.shared_subs, split_topics[0], topiclen, hashv, subhier);
687
0
  if(subhier){
688
0
    rc_shared = sub__search(subhier, split_topics, source_id, topic, qos, retain, *stored);
689
0
    if(rc_shared > 0){
690
0
      rc = rc_shared;
691
0
      goto end;
692
0
    }
693
0
  }
694
695
0
  if(rc_normal == MOSQ_ERR_NO_SUBSCRIBERS && rc_shared == MOSQ_ERR_NO_SUBSCRIBERS){
696
0
    rc = MOSQ_ERR_NO_SUBSCRIBERS;
697
0
  }
698
699
0
  if(retain){
700
0
    rc2 = retain__store(topic, *stored, split_topics, true);
701
0
    if(rc2){
702
0
      rc = rc2;
703
0
    }
704
0
  }
705
706
0
end:
707
0
  mosquitto_FREE(split_topics);
708
0
  mosquitto_FREE(local_topic);
709
  /* Remove our reference and free if needed. */
710
0
  db__msg_store_ref_dec(stored);
711
712
0
  return rc;
713
0
}
714
715
716
/* Remove a subhier element, and return its parent if that needs freeing as well. */
717
static struct mosquitto__subhier *tmp_remove_subs(struct mosquitto__subhier *sub)
718
0
{
719
0
  struct mosquitto__subhier *parent;
720
721
0
  if(!sub || !sub->parent){
722
0
    return NULL;
723
0
  }
724
725
0
  if(sub->children || sub->subs){
726
0
    return NULL;
727
0
  }
728
729
0
  parent = sub->parent;
730
0
  HASH_DELETE(hh, parent->children, sub);
731
0
  mosquitto_FREE(sub);
732
733
0
  if(parent->subs == NULL
734
0
      && parent->children == NULL
735
0
      && parent->shared == NULL
736
0
      && parent->parent){
737
738
0
    return parent;
739
0
  }else{
740
0
    return NULL;
741
0
  }
742
0
}
743
744
745
/* Remove all subscriptions for a client.
746
 */
747
int sub__clean_session(struct mosquitto *context)
748
1.60k
{
749
1.60k
  for(int i=0; i<context->subs_capacity; i++){
750
0
    if(context->subs[i] == NULL || context->subs[i]->hier == NULL){
751
0
      continue;
752
0
    }
753
754
0
    struct mosquitto__subhier *hier = context->subs[i]->hier;
755
0
    struct mosquitto__subleaf *leaf;
756
757
0
    plugin_persist__handle_subscription_delete(context, context->subs[i]->topic_filter);
758
0
    if(context->subs[i]->shared){
759
0
      leaf = context->subs[i]->shared->subs;
760
0
      while(leaf){
761
0
        if(leaf->context==context){
762
0
#ifdef WITH_SYS_TREE
763
0
          db.shared_subscription_count--;
764
0
#endif
765
0
          sub__remove_shared_leaf(context->subs[i]->hier, context->subs[i]->shared, leaf);
766
0
          break;
767
0
        }
768
0
        leaf = leaf->next;
769
0
      }
770
0
    }else{
771
0
      leaf = hier->subs;
772
0
      while(leaf){
773
0
        if(leaf->context==context){
774
0
#ifdef WITH_SYS_TREE
775
0
          db.subscription_count--;
776
0
#endif
777
0
          DL_DELETE(hier->subs, leaf);
778
0
          break;
779
0
        }
780
0
        leaf = leaf->next;
781
0
      }
782
0
    }
783
0
    mosquitto_FREE(context->subs[i]);
784
785
0
    if(hier->subs == NULL
786
0
        && hier->children == NULL
787
0
        && hier->shared == NULL
788
0
        && hier->parent){
789
790
0
      do{
791
0
        hier = tmp_remove_subs(hier);
792
0
      }while(hier);
793
0
    }
794
0
  }
795
1.60k
  mosquitto_FREE(context->subs);
796
1.60k
  context->subs_capacity = 0;
797
1.60k
  context->subs_count = 0;
798
799
1.60k
  return MOSQ_ERR_SUCCESS;
800
1.60k
}
801
802
803
void sub__tree_print(struct mosquitto__subhier *root, int level)
804
0
{
805
0
  int i;
806
0
  struct mosquitto__subhier *branch, *branch_tmp;
807
0
  struct mosquitto__subleaf *leaf;
808
809
0
  HASH_ITER(hh, root, branch, branch_tmp){
810
0
    if(level > -1){
811
0
      for(i=0; i<(level+2)*2; i++){
812
0
        printf(" ");
813
0
      }
814
0
      printf("%s", branch->topic);
815
0
      leaf = branch->subs;
816
0
      while(leaf){
817
0
        if(leaf->context){
818
0
          printf(" (%s, %d)", leaf->context->id, MQTT_SUB_OPT_GET_QOS(leaf->subscription_options));
819
0
        }else{
820
0
          printf(" (%s, %d)", "", MQTT_SUB_OPT_GET_QOS(leaf->subscription_options));
821
0
        }
822
0
        leaf = leaf->next;
823
0
      }
824
0
      printf("\n");
825
0
    }
826
827
0
    sub__tree_print(branch->children, level+1);
828
0
  }
829
0
}
830
831
832
int sub__init(void)
833
0
{
834
0
  HASH_VALUE("+", 1, hashv_plus);
835
0
  HASH_VALUE("#", 1, hashv_hash);
836
837
0
  if(sub__add_hier_entry(NULL, &db.shared_subs, "", 0) == NULL
838
0
      || sub__add_hier_entry(NULL, &db.normal_subs, "", 0) == NULL
839
0
      || sub__add_hier_entry(NULL, &db.normal_subs, "$SYS", (uint16_t)strlen("$SYS")) == NULL
840
0
      ){
841
842
0
    return MOSQ_ERR_NOMEM;
843
0
  }else{
844
0
    return MOSQ_ERR_SUCCESS;
845
0
  }
846
0
}