Coverage Report

Created: 2025-11-24 06:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/mosquitto/src/retain.c
Line
Count
Source
1
/*
2
Copyright (c) 2010-2021 Roger Light <roger@atchoo.org>
3
4
All rights reserved. This program and the accompanying materials
5
are made available under the terms of the Eclipse Public License 2.0
6
and Eclipse Distribution License v1.0 which accompany this distribution.
7
8
The Eclipse Public License is available at
9
   https://www.eclipse.org/legal/epl-2.0/
10
and the Eclipse Distribution License is available at
11
  http://www.eclipse.org/org/documents/edl-v10.php.
12
13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15
Contributors:
16
   Roger Light - initial implementation and documentation.
17
*/
18
19
#include "config.h"
20
21
#include <assert.h>
22
#include <stdio.h>
23
#include <string.h>
24
25
#include "mosquitto_broker_internal.h"
26
#include "mosquitto/mqtt_protocol.h"
27
#include "util_mosq.h"
28
29
#include "utlist.h"
30
31
static time_t next_expire_check = 0;
32
33
static struct mosquitto__retainhier *retain__add_hier_entry(struct mosquitto__retainhier *parent, struct mosquitto__retainhier **sibling, const char *topic, uint16_t len)
34
0
{
35
0
  struct mosquitto__retainhier *child;
36
37
0
  assert(sibling);
38
39
0
  child = mosquitto_calloc(1, sizeof(struct mosquitto__retainhier) + len + 1);
40
0
  if(!child){
41
0
    log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
42
0
    return NULL;
43
0
  }
44
0
  child->parent = parent;
45
0
  child->topic_len = len;
46
0
  strncpy(child->topic, topic, len);
47
48
0
  HASH_ADD(hh, *sibling, topic, child->topic_len, child);
49
50
0
  return child;
51
0
}
52
53
54
int retain__init(void)
55
0
{
56
0
  struct mosquitto__retainhier *retainhier;
57
58
0
  retainhier = retain__add_hier_entry(NULL, &db.retains, "", 0);
59
0
  if(!retainhier){
60
0
    return MOSQ_ERR_NOMEM;
61
0
  }
62
63
0
  retainhier = retain__add_hier_entry(NULL, &db.retains, "$SYS", (uint16_t)strlen("$SYS"));
64
0
  if(!retainhier){
65
0
    return MOSQ_ERR_NOMEM;
66
0
  }
67
68
0
  return MOSQ_ERR_SUCCESS;
69
0
}
70
71
72
BROKER_EXPORT int mosquitto_persist_retain_msg_set(const char *topic, uint64_t base_msg_id)
73
0
{
74
0
  struct mosquitto__base_msg *base_msg;
75
0
  int rc = MOSQ_ERR_UNKNOWN;
76
0
  char **split_topics = NULL;
77
0
  char *local_topic = NULL;
78
79
0
  if(topic == NULL){
80
0
    return MOSQ_ERR_INVAL;
81
0
  }
82
83
0
  HASH_FIND(hh, db.msg_store, &base_msg_id, sizeof(base_msg_id), base_msg);
84
0
  if(base_msg){
85
0
    if(sub__topic_tokenise(topic, &local_topic, &split_topics, NULL)){
86
0
      return MOSQ_ERR_NOMEM;
87
0
    }
88
89
0
    rc = retain__store(topic, base_msg, split_topics, false);
90
0
    mosquitto_free(split_topics);
91
0
    mosquitto_free(local_topic);
92
0
  }
93
94
0
  return rc;
95
0
}
96
97
98
BROKER_EXPORT int mosquitto_persist_retain_msg_delete(const char *topic)
99
0
{
100
0
  struct mosquitto__base_msg base_msg;
101
0
  int rc = MOSQ_ERR_UNKNOWN;
102
0
  char **split_topics = NULL;
103
0
  char *local_topic = NULL;
104
105
0
  if(topic == NULL){
106
0
    return MOSQ_ERR_INVAL;
107
0
  }
108
109
0
  memset(&base_msg, 0, sizeof(base_msg));
110
0
  base_msg.ref_count = 10; /* Ensure this isn't freed */
111
112
0
  if(sub__topic_tokenise(topic, &local_topic, &split_topics, NULL)){
113
0
    return MOSQ_ERR_NOMEM;
114
0
  }
115
116
  /* With stored->payloadlen == 0, this means the message will be removed */
117
0
  rc = retain__store(topic, &base_msg, split_topics, false);
118
0
  mosquitto_FREE(split_topics);
119
0
  mosquitto_FREE(local_topic);
120
121
0
  return rc;
122
0
}
123
124
125
void retain__clean_empty_hierarchy(struct mosquitto__retainhier *retainhier)
126
0
{
127
0
  while(retainhier){
128
0
    if(retainhier->children || retainhier->retained || retainhier->parent == NULL){
129
      /* Entry is being used */
130
0
      return;
131
0
    }else{
132
0
      HASH_DELETE(hh, retainhier->parent->children, retainhier);
133
134
0
      struct mosquitto__retainhier *parent = retainhier->parent;
135
0
      mosquitto_FREE(retainhier);
136
0
      retainhier = parent;
137
0
    }
138
0
  }
139
0
}
140
141
142
int retain__store(const char *topic, struct mosquitto__base_msg *base_msg, char **split_topics, bool persist)
143
0
{
144
0
  struct mosquitto__retainhier *retainhier;
145
0
  struct mosquitto__retainhier *branch;
146
0
  size_t slen;
147
148
0
  assert(base_msg);
149
0
  assert(split_topics);
150
151
0
  HASH_FIND(hh, db.retains, split_topics[0], strlen(split_topics[0]), retainhier);
152
0
  if(retainhier == NULL){
153
0
    retainhier = retain__add_hier_entry(NULL, &db.retains, split_topics[0], (uint16_t)strlen(split_topics[0]));
154
0
    if(!retainhier){
155
0
      return MOSQ_ERR_NOMEM;
156
0
    }
157
0
  }
158
159
0
  for(int i=0; split_topics[i] != NULL; i++){
160
0
    slen = strlen(split_topics[i]);
161
0
    HASH_FIND(hh, retainhier->children, split_topics[i], slen, branch);
162
0
    if(branch == NULL){
163
0
      branch = retain__add_hier_entry(retainhier, &retainhier->children, split_topics[i], (uint16_t)slen);
164
0
      if(branch == NULL){
165
0
        return MOSQ_ERR_NOMEM;
166
0
      }
167
0
    }
168
0
    retainhier = branch;
169
0
  }
170
171
0
#ifdef WITH_PERSISTENCE
172
0
  if(strncmp(topic, "$SYS", 4)){
173
    /* Retained messages count as a persistence change, but only if
174
     * they aren't for $SYS. */
175
0
    db.persistence_changes++;
176
0
  }
177
#else
178
  UNUSED(topic);
179
#endif
180
181
0
  if(retainhier->retained){
182
0
    if(persist && retainhier->retained->data.topic[0] != '$' && base_msg->data.payloadlen == 0){
183
      /* Only delete if another retained message isn't replacing this one */
184
0
      plugin_persist__handle_retain_msg_delete(retainhier->retained);
185
0
    }
186
0
    db__msg_store_ref_dec(&retainhier->retained);
187
0
#ifdef WITH_SYS_TREE
188
0
    db.retained_count--;
189
0
#endif
190
0
    if(base_msg->data.payloadlen == 0){
191
0
      retainhier->retained = NULL;
192
0
      retain__clean_empty_hierarchy(retainhier);
193
0
    }
194
0
  }
195
0
  if(base_msg->data.payloadlen){
196
0
    retainhier->retained = base_msg;
197
0
    db__msg_store_ref_inc(retainhier->retained);
198
0
    if(persist && retainhier->retained->data.topic[0] != '$'){
199
0
      plugin_persist__handle_base_msg_add(retainhier->retained);
200
0
      plugin_persist__handle_retain_msg_set(retainhier->retained);
201
0
    }
202
0
#ifdef WITH_SYS_TREE
203
0
    db.retained_count++;
204
0
#endif
205
0
  }
206
207
0
  return MOSQ_ERR_SUCCESS;
208
0
}
209
210
211
static bool retain__delete_expired_msg(struct mosquitto__retainhier *branch)
212
0
{
213
0
  if(branch->retained && branch->retained->data.expiry_time > 0 && db.now_real_s >= branch->retained->data.expiry_time){
214
0
    plugin_persist__handle_retain_msg_delete(branch->retained);
215
0
    db__msg_store_ref_dec(&branch->retained);
216
0
    branch->retained = NULL;
217
0
#ifdef WITH_SYS_TREE
218
0
    db.retained_count--;
219
0
#endif
220
0
    return true;
221
0
  }
222
0
  return false;
223
0
}
224
225
226
static int retain__process(struct mosquitto__retainhier *branch, struct mosquitto *context, const struct mosquitto_subscription *sub)
227
0
{
228
0
  int rc = 0;
229
0
  uint8_t qos, sub_qos;
230
0
  uint16_t mid;
231
0
  struct mosquitto__base_msg *retained;
232
233
0
  if(retain__delete_expired_msg(branch)){
234
0
    return MOSQ_ERR_SUCCESS;
235
0
  }
236
237
0
  retained = branch->retained;
238
239
0
  rc = mosquitto_acl_check(context, retained->data.topic, retained->data.payloadlen, retained->data.payload,
240
0
      retained->data.qos, retained->data.retain, MOSQ_ACL_READ);
241
0
  if(rc == MOSQ_ERR_ACL_DENIED){
242
0
    return MOSQ_ERR_SUCCESS;
243
0
  }else if(rc != MOSQ_ERR_SUCCESS){
244
0
    return rc;
245
0
  }
246
247
  /* Check for original source access */
248
0
  if(db.config->check_retain_source && retained->origin != mosq_mo_broker && retained->data.source_id){
249
0
    struct mosquitto retain_ctxt;
250
0
    memset(&retain_ctxt, 0, sizeof(struct mosquitto));
251
252
0
    retain_ctxt.id = retained->data.source_id;
253
0
    retain_ctxt.username = retained->data.source_username;
254
0
    retain_ctxt.listener = retained->source_listener;
255
256
0
    rc = mosquitto_acl_check(&retain_ctxt, retained->data.topic, retained->data.payloadlen, retained->data.payload,
257
0
        retained->data.qos, retained->data.retain, MOSQ_ACL_WRITE);
258
0
    if(rc == MOSQ_ERR_ACL_DENIED){
259
0
      return MOSQ_ERR_SUCCESS;
260
0
    }else if(rc != MOSQ_ERR_SUCCESS){
261
0
      return rc;
262
0
    }
263
0
  }
264
265
0
  sub_qos = sub->options & 0x03;
266
0
  if(db.config->upgrade_outgoing_qos){
267
0
    qos = sub_qos;
268
0
  }else{
269
0
    qos = retained->data.qos;
270
0
    if(qos > sub_qos){
271
0
      qos = sub_qos;
272
0
    }
273
0
  }
274
0
  if(qos > 0){
275
0
    mid = mosquitto__mid_generate(context);
276
0
  }else{
277
0
    mid = 0;
278
0
  }
279
0
  return db__message_insert_outgoing(context, 0, mid, qos, true, retained, sub->identifier, false, true);
280
0
}
281
282
283
static int retain__search(struct mosquitto__retainhier *retainhier, char **split_topics, struct mosquitto *context, const struct mosquitto_subscription *sub, int level)
284
0
{
285
0
  struct mosquitto__retainhier *branch, *branch_tmp;
286
0
  int flag = 0;
287
288
0
  if(!strcmp(split_topics[0], "#") && split_topics[1] == NULL){
289
0
    HASH_ITER(hh, retainhier->children, branch, branch_tmp){
290
      /* Set flag to indicate that we should check for retained messages
291
       * on "foo" when we are subscribing to e.g. "foo/#" and then exit
292
       * this function and return to an earlier retain__search().
293
       */
294
0
      flag = -1;
295
0
      if(branch->retained){
296
0
        retain__process(branch, context, sub);
297
0
      }
298
0
      if(branch->children){
299
0
        retain__search(branch, split_topics, context, sub, level+1);
300
0
      }
301
0
    }
302
0
  }else{
303
0
    if(!strcmp(split_topics[0], "+")){
304
0
      HASH_ITER(hh, retainhier->children, branch, branch_tmp){
305
0
        if(split_topics[1] != NULL){
306
0
          if(retain__search(branch, &(split_topics[1]), context, sub, level+1) == -1
307
0
              || (split_topics[1] != NULL && !strcmp(split_topics[1], "#") && level>0)){
308
309
0
            if(branch->retained){
310
0
              retain__process(branch, context, sub);
311
0
            }
312
0
          }
313
0
        }else{
314
0
          if(branch->retained){
315
0
            retain__process(branch, context, sub);
316
0
          }
317
0
        }
318
0
      }
319
0
    }else{
320
0
      HASH_FIND(hh, retainhier->children, split_topics[0], strlen(split_topics[0]), branch);
321
0
      if(branch){
322
0
        if(split_topics[1] != NULL){
323
0
          if(retain__search(branch, &(split_topics[1]), context, sub, level+1) == -1
324
0
              || (split_topics[1] != NULL && !strcmp(split_topics[1], "#") && level>0)){
325
326
0
            if(branch->retained){
327
0
              retain__process(branch, context, sub);
328
0
            }
329
0
          }
330
0
        }else{
331
0
          if(branch->retained){
332
0
            retain__process(branch, context, sub);
333
0
          }
334
0
        }
335
0
      }
336
0
    }
337
0
  }
338
0
  return flag;
339
0
}
340
341
342
int retain__queue(struct mosquitto *context, const struct mosquitto_subscription *sub)
343
0
{
344
0
  struct mosquitto__retainhier *retainhier;
345
0
  char *local_sub;
346
0
  char **split_topics;
347
0
  int rc;
348
349
0
  assert(context);
350
0
  assert(sub);
351
352
0
  if(!strncmp(sub->topic_filter, "$share/", strlen("$share/"))){
353
0
    return MOSQ_ERR_SUCCESS;
354
0
  }
355
356
0
  rc = sub__topic_tokenise(sub->topic_filter, &local_sub, &split_topics, NULL);
357
0
  if(rc){
358
0
    return rc;
359
0
  }
360
361
0
  HASH_FIND(hh, db.retains, split_topics[0], strlen(split_topics[0]), retainhier);
362
363
0
  if(retainhier){
364
0
    retain__search(retainhier, split_topics, context, sub, 0);
365
0
  }
366
0
  mosquitto_FREE(local_sub);
367
0
  mosquitto_FREE(split_topics);
368
369
0
  return MOSQ_ERR_SUCCESS;
370
0
}
371
372
373
void retain__expire(struct mosquitto__retainhier **retainhier)
374
0
{
375
0
  struct mosquitto__retainhier *peer, *retainhier_tmp;
376
377
0
  HASH_ITER(hh, *retainhier, peer, retainhier_tmp){
378
0
    retain__expire(&peer->children);
379
0
    if(retain__delete_expired_msg(peer)){
380
0
      retain__clean_empty_hierarchy(peer);
381
0
    }
382
0
  }
383
0
}
384
385
386
void retain__clean(struct mosquitto__retainhier **retainhier)
387
0
{
388
0
  struct mosquitto__retainhier *peer, *retainhier_tmp;
389
390
0
  HASH_ITER(hh, *retainhier, peer, retainhier_tmp){
391
0
    if(peer->retained){
392
0
      db__msg_store_ref_dec(&peer->retained);
393
0
    }
394
0
    retain__clean(&peer->children);
395
396
0
    HASH_DELETE(hh, *retainhier, peer);
397
0
    mosquitto_FREE(peer);
398
0
  }
399
0
}
400
401
402
void retain__expiry_check(void)
403
0
{
404
0
  if(db.config->retain_expiry_interval > 0 && db.now_s > next_expire_check){
405
0
    retain__expire(&db.retains);
406
0
    next_expire_check = db.now_s + db.config->retain_expiry_interval;
407
0
  }
408
0
}