Coverage Report

Created: 2025-11-07 06:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/mosquitto/src/database.c
Line
Count
Source
1
/*
2
Copyright (c) 2009-2021 Roger Light <roger@atchoo.org>
3
4
All rights reserved. This program and the accompanying materials
5
are made available under the terms of the Eclipse Public License 2.0
6
and Eclipse Distribution License v1.0 which accompany this distribution.
7
8
The Eclipse Public License is available at
9
   https://www.eclipse.org/legal/epl-2.0/
10
and the Eclipse Distribution License is available at
11
  http://www.eclipse.org/org/documents/edl-v10.php.
12
13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15
Contributors:
16
   Roger Light - initial implementation and documentation.
17
*/
18
19
#include "config.h"
20
21
#include <assert.h>
22
#include <stdio.h>
23
#include <utlist.h>
24
25
#include "mosquitto_broker_internal.h"
26
#include "send_mosq.h"
27
#include "sys_tree.h"
28
#include "util_mosq.h"
29
30
31
/**
32
 * Is this context ready to take more in flight messages right now?
33
 * @param context the client context of interest
34
 * @param qos qos for the packet of interest
35
 * @return true if more in flight are allowed.
36
 */
37
bool db__ready_for_flight(struct mosquitto *context, enum mosquitto_msg_direction dir, int qos)
38
0
{
39
0
  struct mosquitto_msg_data *msgs;
40
0
  bool valid_bytes;
41
0
  bool valid_count;
42
43
0
  if(dir == mosq_md_out){
44
0
    msgs = &context->msgs_out;
45
0
  }else{
46
0
    msgs = &context->msgs_in;
47
0
  }
48
49
0
  if(msgs->inflight_maximum == 0 && db.config->max_inflight_bytes == 0){
50
0
    return true;
51
0
  }
52
53
0
  if(qos == 0){
54
    /* Deliver QoS 0 messages unless the queue is already full.
55
     * For QoS 0 messages the choice is either "inflight" or dropped.
56
     * There is no queueing option, unless the client is offline and
57
     * queue_qos0_messages is enabled.
58
     */
59
0
    if(db.config->max_queued_messages == 0 && db.config->max_inflight_bytes == 0){
60
0
      return true;
61
0
    }
62
0
    valid_bytes = ((msgs->inflight_bytes - (ssize_t)db.config->max_inflight_bytes) < (ssize_t)db.config->max_queued_bytes);
63
0
    if(dir == mosq_md_out){
64
0
      valid_count = context->out_packet_count < db.config->max_queued_messages;
65
0
    }else{
66
0
      valid_count = msgs->inflight_count - msgs->inflight_maximum < db.config->max_queued_messages;
67
0
    }
68
69
0
    if(db.config->max_queued_messages == 0){
70
0
      return valid_bytes;
71
0
    }
72
0
    if(db.config->max_queued_bytes == 0){
73
0
      return valid_count;
74
0
    }
75
0
  }else{
76
0
    valid_bytes = (ssize_t)msgs->inflight_bytes12 < (ssize_t)db.config->max_inflight_bytes;
77
0
    valid_count = msgs->inflight_quota > 0;
78
79
0
    if(msgs->inflight_maximum == 0){
80
0
      return valid_bytes;
81
0
    }
82
0
    if(db.config->max_inflight_bytes == 0){
83
0
      return valid_count;
84
0
    }
85
0
  }
86
87
0
  return valid_bytes && valid_count;
88
0
}
89
90
91
/**
92
 * For a given client context, are more messages allowed to be queued?
93
 * It is assumed that inflight checks and queue_qos0 checks have already
94
 * been made.
95
 * @param context client of interest
96
 * @param qos destination qos for the packet of interest
97
 * @return true if queuing is allowed, false if should be dropped
98
 */
99
bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_msg_data *msg_data)
100
0
{
101
0
  int source_count;
102
0
  int adjust_count;
103
0
  long source_bytes;
104
0
  ssize_t adjust_bytes = (ssize_t)db.config->max_inflight_bytes;
105
0
  bool valid_bytes;
106
0
  bool valid_count;
107
108
0
  if(db.config->max_queued_messages == 0 && db.config->max_queued_bytes == 0){
109
0
    return true;
110
0
  }
111
112
0
  if(qos == 0 && db.config->queue_qos0_messages == false){
113
0
    return false; /* This case is handled in db__ready_for_flight() */
114
0
  }else{
115
0
    source_bytes = (ssize_t)msg_data->queued_bytes12;
116
0
    source_count = msg_data->queued_count12;
117
0
  }
118
0
  adjust_count = msg_data->inflight_maximum;
119
120
  /* nothing in flight for offline clients */
121
0
  if(!net__is_connected(context)){
122
0
    adjust_bytes = 0;
123
0
    adjust_count = 0;
124
0
  }
125
126
0
  valid_bytes = (source_bytes - (ssize_t)adjust_bytes) < (ssize_t)db.config->max_queued_bytes;
127
0
  valid_count = source_count - adjust_count < db.config->max_queued_messages;
128
129
0
  if(db.config->max_queued_bytes == 0){
130
0
    return valid_count;
131
0
  }
132
0
  if(db.config->max_queued_messages == 0){
133
0
    return valid_bytes;
134
0
  }
135
136
0
  return valid_bytes && valid_count;
137
0
}
138
139
140
void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg)
141
0
{
142
0
  msg_data->inflight_count++;
143
0
  msg_data->inflight_bytes += client_msg->base_msg->data.payloadlen;
144
0
  if(client_msg->data.qos != 0){
145
0
    msg_data->inflight_count12++;
146
0
    msg_data->inflight_bytes12 += client_msg->base_msg->data.payloadlen;
147
0
  }
148
0
}
149
150
151
static void db__msg_remove_from_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg)
152
0
{
153
0
  msg_data->inflight_count--;
154
0
  msg_data->inflight_bytes -= client_msg->base_msg->data.payloadlen;
155
0
  if(client_msg->data.qos != 0){
156
0
    msg_data->inflight_count12--;
157
0
    msg_data->inflight_bytes12 -= client_msg->base_msg->data.payloadlen;
158
0
  }
159
0
}
160
161
162
void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg)
163
0
{
164
0
  msg_data->queued_count++;
165
0
  msg_data->queued_bytes += client_msg->base_msg->data.payloadlen;
166
0
  if(client_msg->data.qos != 0){
167
0
    msg_data->queued_count12++;
168
0
    msg_data->queued_bytes12 += client_msg->base_msg->data.payloadlen;
169
0
  }
170
0
}
171
172
173
static void db__msg_remove_from_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg)
174
0
{
175
0
  msg_data->queued_count--;
176
0
  msg_data->queued_bytes -= client_msg->base_msg->data.payloadlen;
177
0
  if(client_msg->data.qos != 0){
178
0
    msg_data->queued_count12--;
179
0
    msg_data->queued_bytes12 -= client_msg->base_msg->data.payloadlen;
180
0
  }
181
0
}
182
183
184
int db__open(struct mosquitto__config *config)
185
0
{
186
0
  if(!config){
187
0
    return MOSQ_ERR_INVAL;
188
0
  }
189
190
0
  db.contexts_by_id = NULL;
191
0
  db.contexts_by_sock = NULL;
192
0
  db.contexts_for_free = NULL;
193
0
#ifdef WITH_BRIDGE
194
0
  db.bridges = NULL;
195
0
  db.bridge_count = 0;
196
0
#endif
197
198
  /* Initialize the hashtable */
199
0
  db.clientid_index_hash = NULL;
200
201
0
  db.normal_subs = NULL;
202
0
  db.shared_subs = NULL;
203
204
0
  sub__init();
205
0
  retain__init();
206
207
0
  db.config->security_options.unpwd = NULL;
208
209
0
#ifdef WITH_PERSISTENCE
210
0
  if(persist__restore()){
211
0
    return 1;
212
0
  }
213
0
#endif
214
215
0
  return MOSQ_ERR_SUCCESS;
216
0
}
217
218
219
static void subhier_clean(struct mosquitto__subhier **subhier)
220
0
{
221
0
  struct mosquitto__subhier *peer, *subhier_tmp;
222
0
  struct mosquitto__subleaf *leaf, *nextleaf;
223
224
0
  HASH_ITER(hh, *subhier, peer, subhier_tmp){
225
0
    leaf = peer->subs;
226
0
    while(leaf){
227
0
      nextleaf = leaf->next;
228
0
      mosquitto_FREE(leaf);
229
0
      leaf = nextleaf;
230
0
    }
231
0
    subhier_clean(&peer->children);
232
233
0
    HASH_DELETE(hh, *subhier, peer);
234
0
    mosquitto_FREE(peer);
235
0
  }
236
0
}
237
238
239
int db__close(void)
240
0
{
241
0
  subhier_clean(&db.normal_subs);
242
0
  subhier_clean(&db.shared_subs);
243
0
  retain__clean(&db.retains);
244
0
  db__msg_store_clean();
245
246
0
  return MOSQ_ERR_SUCCESS;
247
0
}
248
249
250
int db__msg_store_add(struct mosquitto__base_msg *base_msg)
251
0
{
252
0
  struct mosquitto__base_msg *found;
253
0
  unsigned hashv;
254
255
0
  HASH_VALUE(&base_msg->data.store_id, sizeof(base_msg->data.store_id), hashv);
256
0
  HASH_FIND_BYHASHVALUE(hh, db.msg_store, &base_msg->data.store_id, sizeof(base_msg->data.store_id), hashv, found);
257
0
  if(found == NULL){
258
0
    HASH_ADD_KEYPTR_BYHASHVALUE(hh, db.msg_store, &base_msg->data.store_id, sizeof(base_msg->data.store_id), hashv, base_msg);
259
0
    return MOSQ_ERR_SUCCESS;
260
0
  }else{
261
0
    return MOSQ_ERR_ALREADY_EXISTS;
262
0
  }
263
0
}
264
265
266
void db__msg_store_free(struct mosquitto__base_msg *base_msg)
267
0
{
268
0
  mosquitto_FREE(base_msg->data.source_id);
269
0
  mosquitto_FREE(base_msg->data.source_username);
270
0
  if(base_msg->dest_ids){
271
0
    for(int i=0; i<base_msg->dest_id_count; i++){
272
0
      mosquitto_FREE(base_msg->dest_ids[i]);
273
0
    }
274
0
    mosquitto_FREE(base_msg->dest_ids);
275
0
  }
276
0
  mosquitto_FREE(base_msg->data.topic);
277
0
  mosquitto_property_free_all(&base_msg->data.properties);
278
0
  mosquitto_FREE(base_msg->data.payload);
279
0
  mosquitto_FREE(base_msg);
280
0
}
281
282
283
void db__msg_store_remove(struct mosquitto__base_msg *base_msg, bool notify)
284
0
{
285
0
  if(base_msg == NULL){
286
0
    return;
287
0
  }
288
0
  HASH_DELETE(hh, db.msg_store, base_msg);
289
0
  db.msg_store_count--;
290
0
  db.msg_store_bytes -= base_msg->data.payloadlen;
291
0
  if(notify == true){
292
0
    plugin_persist__handle_base_msg_delete(base_msg);
293
0
  }
294
0
  db__msg_store_free(base_msg);
295
0
}
296
297
298
void db__msg_store_clean(void)
299
0
{
300
0
  struct mosquitto__base_msg *base_msg, *base_msg_tmp;
301
302
0
  HASH_ITER(hh, db.msg_store, base_msg, base_msg_tmp){
303
0
    db__msg_store_remove(base_msg, false);
304
0
  }
305
0
}
306
307
308
void db__msg_store_ref_inc(struct mosquitto__base_msg *base_msg)
309
0
{
310
0
  base_msg->ref_count++;
311
0
}
312
313
314
void db__msg_store_ref_dec(struct mosquitto__base_msg **base_msg)
315
0
{
316
0
  (*base_msg)->ref_count--;
317
0
  if((*base_msg)->ref_count == 0){
318
0
    db__msg_store_remove(*base_msg, true);
319
0
    *base_msg = NULL;
320
0
  }
321
0
}
322
323
324
void db__msg_store_compact(void)
325
0
{
326
0
  struct mosquitto__base_msg *base_msg, *base_msg_tmp;
327
328
0
  HASH_ITER(hh, db.msg_store, base_msg, base_msg_tmp){
329
0
    if(base_msg->ref_count < 1){
330
0
      db__msg_store_remove(base_msg, true);
331
0
    }
332
0
  }
333
0
}
334
335
336
static void db__message_remove_inflight(struct mosquitto *context, struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *item)
337
0
{
338
0
  if(!context || !msg_data || !item){
339
0
    return;
340
0
  }
341
342
0
  plugin_persist__handle_client_msg_delete(context, item);
343
344
0
  DL_DELETE(msg_data->inflight, item);
345
0
  if(item->base_msg){
346
0
    db__msg_remove_from_inflight_stats(msg_data, item);
347
0
    db__msg_store_ref_dec(&item->base_msg);
348
0
  }
349
350
0
  mosquitto_FREE(item);
351
0
}
352
353
354
static void db__message_remove_queued(struct mosquitto *context, struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *item)
355
0
{
356
0
  if(!context || !msg_data || !item){
357
0
    return;
358
0
  }
359
360
0
  plugin_persist__handle_client_msg_delete(context, item);
361
362
0
  DL_DELETE(msg_data->queued, item);
363
0
  if(item->base_msg){
364
0
    db__msg_remove_from_queued_stats(msg_data, item);
365
0
    db__msg_store_ref_dec(&item->base_msg);
366
0
  }
367
368
0
  mosquitto_FREE(item);
369
0
}
370
371
372
static void db__fill_inflight_out_from_queue(struct mosquitto *context)
373
0
{
374
0
  struct mosquitto__client_msg *client_msg, *tmp;
375
376
0
  DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){
377
0
    if(!db__ready_for_flight(context, mosq_md_out, client_msg->data.qos)){
378
0
      return;
379
0
    }
380
0
    switch(client_msg->data.qos){
381
0
      case 0:
382
0
        client_msg->data.state = mosq_ms_publish_qos0;
383
0
        break;
384
0
      case 1:
385
0
        client_msg->data.state = mosq_ms_publish_qos1;
386
0
        break;
387
0
      case 2:
388
0
        client_msg->data.state = mosq_ms_publish_qos2;
389
0
        break;
390
0
    }
391
0
    if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){
392
0
      db__message_remove_queued(context, &context->msgs_out, client_msg);
393
0
      continue;
394
0
    }
395
0
    plugin_persist__handle_client_msg_update(context, client_msg);
396
0
    db__message_dequeue_first(context, &context->msgs_out);
397
0
  }
398
0
}
399
400
401
void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data)
402
0
{
403
0
  struct mosquitto__client_msg *client_msg;
404
405
0
  UNUSED(context);
406
407
0
  client_msg = msg_data->queued;
408
0
  DL_DELETE(msg_data->queued, client_msg);
409
0
  DL_APPEND(msg_data->inflight, client_msg);
410
0
  if(msg_data->inflight_quota > 0){
411
0
    msg_data->inflight_quota--;
412
0
  }
413
414
0
  db__msg_remove_from_queued_stats(msg_data, client_msg);
415
0
  db__msg_add_to_inflight_stats(msg_data, client_msg);
416
0
}
417
418
419
int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state expect_state, int qos)
420
0
{
421
0
  struct mosquitto__client_msg *client_msg, *tmp;
422
0
  bool deleted = false;
423
424
0
  if(!context){
425
0
    return MOSQ_ERR_INVAL;
426
0
  }
427
428
0
  DL_FOREACH_SAFE(context->msgs_out.inflight, client_msg, tmp){
429
0
    if(client_msg->data.mid == mid){
430
0
      if(client_msg->data.qos != qos){
431
0
        return MOSQ_ERR_PROTOCOL;
432
0
      }else if(qos == 2 && client_msg->data.state != expect_state && expect_state != mosq_ms_any){
433
0
        return MOSQ_ERR_PROTOCOL;
434
0
      }
435
0
      db__message_remove_inflight(context, &context->msgs_out, client_msg);
436
0
      deleted = true;
437
0
      break;
438
0
    }
439
0
  }
440
441
0
  if(deleted == false){
442
0
    DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){
443
0
      if(client_msg->data.mid == mid){
444
0
        if(client_msg->data.qos != qos){
445
0
          return MOSQ_ERR_PROTOCOL;
446
0
        }else if(qos == 2 && client_msg->data.state != expect_state && expect_state != mosq_ms_any){
447
0
          return MOSQ_ERR_PROTOCOL;
448
0
        }
449
0
        db__message_remove_queued(context, &context->msgs_out, client_msg);
450
0
        break;
451
0
      }
452
0
    }
453
0
  }
454
0
  db__fill_inflight_out_from_queue(context);
455
0
#ifdef WITH_PERSISTENCE
456
0
  db.persistence_changes++;
457
0
#endif
458
459
0
  return db__message_write_inflight_out_latest(context);
460
0
}
461
462
463
/* Only for QoS 2 messages */
464
int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto__base_msg *base_msg, bool persist)
465
0
{
466
0
  struct mosquitto__client_msg *client_msg;
467
0
  struct mosquitto_msg_data *msg_data;
468
0
  enum mosquitto_msg_state state = mosq_ms_invalid;
469
0
  int rc = 0;
470
471
0
  assert(base_msg);
472
0
  if(!context){
473
0
    return MOSQ_ERR_INVAL;
474
0
  }
475
0
  if(!context->id){
476
    /* Protect against unlikely "client is disconnected but not entirely freed" scenario */
477
0
    return MOSQ_ERR_SUCCESS;
478
479
0
  }
480
0
  msg_data = &context->msgs_in;
481
482
0
  if(db__ready_for_flight(context, mosq_md_in, base_msg->data.qos)){
483
0
    state = mosq_ms_wait_for_pubrel;
484
0
  }else if(base_msg->data.qos != 0 && db__ready_for_queue(context, base_msg->data.qos, msg_data)){
485
0
    state = mosq_ms_queued;
486
0
    rc = 2;
487
0
  }else{
488
    /* Dropping message due to full queue. */
489
0
    if(context->is_dropping == false){
490
0
      context->is_dropping = true;
491
0
      log__printf(NULL, MOSQ_LOG_NOTICE,
492
0
          "Outgoing messages are being dropped for client %s.",
493
0
          context->id);
494
0
    }
495
0
    metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1);
496
0
    context->stats.messages_dropped++;
497
498
0
    return 2;
499
0
  }
500
501
0
  assert(state != mosq_ms_invalid);
502
503
0
#ifdef WITH_PERSISTENCE
504
0
  if(state == mosq_ms_queued){
505
0
    db.persistence_changes++;
506
0
  }
507
0
#endif
508
509
0
  client_msg = mosquitto_malloc(sizeof(struct mosquitto__client_msg));
510
0
  if(!client_msg){
511
0
    return MOSQ_ERR_NOMEM;
512
0
  }
513
0
  client_msg->prev = NULL;
514
0
  client_msg->next = NULL;
515
0
  if(cmsg_id){
516
0
    client_msg->data.cmsg_id = cmsg_id;
517
0
  }else{
518
0
    client_msg->data.cmsg_id = ++context->last_cmsg_id;
519
0
  }
520
0
  client_msg->base_msg = base_msg;
521
0
  db__msg_store_ref_inc(client_msg->base_msg);
522
0
  client_msg->data.mid = base_msg->data.source_mid;
523
0
  client_msg->data.direction = mosq_md_in;
524
0
  client_msg->data.state = (enum mosquitto_msg_state)state;
525
0
  client_msg->data.dup = false;
526
0
  if(base_msg->data.qos > context->max_qos){
527
0
    client_msg->data.qos = context->max_qos;
528
0
  }else{
529
0
    client_msg->data.qos = base_msg->data.qos;
530
0
  }
531
0
  client_msg->data.retain = base_msg->data.retain;
532
0
  client_msg->data.subscription_identifier = 0;
533
534
0
  if(state == mosq_ms_queued){
535
0
    DL_APPEND(msg_data->queued, client_msg);
536
0
    db__msg_add_to_queued_stats(msg_data, client_msg);
537
0
  }else{
538
0
    DL_APPEND(msg_data->inflight, client_msg);
539
0
    db__msg_add_to_inflight_stats(msg_data, client_msg);
540
0
  }
541
542
0
  if(persist && context->is_persisted){
543
0
    plugin_persist__handle_base_msg_add(client_msg->base_msg);
544
0
    plugin_persist__handle_client_msg_add(context, client_msg);
545
0
  }
546
547
0
  if(client_msg->base_msg->data.qos > 0){
548
0
    util__decrement_receive_quota(context);
549
0
  }
550
0
  return rc;
551
0
}
552
553
554
int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto__base_msg *base_msg, uint32_t subscription_identifier, bool update, bool persist)
555
0
{
556
0
  struct mosquitto__client_msg *client_msg;
557
0
  struct mosquitto_msg_data *msg_data;
558
0
  enum mosquitto_msg_state state = mosq_ms_invalid;
559
0
  int rc = 0;
560
0
  char **dest_ids;
561
562
0
  assert(base_msg);
563
0
  if(!context){
564
0
    return MOSQ_ERR_INVAL;
565
0
  }
566
0
  if(!context->id){
567
    /* Protect against unlikely "client is disconnected but not entirely freed" scenario */
568
0
    return MOSQ_ERR_SUCCESS;
569
570
0
  }
571
0
  context->stats.messages_sent++;
572
573
0
  msg_data = &context->msgs_out;
574
575
  /* Check whether we've already sent this message to this client
576
   * for outgoing messages only.
577
   * If retain==true then this is a stale retained message and so should be
578
   * sent regardless. FIXME - this does mean retained messages will received
579
   * multiple times for overlapping subscriptions, although this is only the
580
   * case for SUBSCRIPTION with multiple subs in so is a minor concern.
581
   */
582
0
  if(context->protocol != mosq_p_mqtt5
583
0
      && db.config->allow_duplicate_messages == false
584
0
      && retain == false && base_msg->dest_ids){
585
586
0
    for(int i=0; i<base_msg->dest_id_count; i++){
587
0
      if(base_msg->dest_ids[i] && !strcmp(base_msg->dest_ids[i], context->id)){
588
        /* We have already sent this message to this client. */
589
0
        return MOSQ_ERR_SUCCESS;
590
0
      }
591
0
    }
592
0
  }
593
0
  if(!net__is_connected(context)){
594
    /* Client is not connected only queue messages with QoS>0. */
595
0
    if(qos == 0 && !db.config->queue_qos0_messages){
596
0
      if(!context->bridge){
597
0
        return 2;
598
0
      }else{
599
0
        if(context->bridge->start_type != bst_lazy){
600
0
          return 2;
601
0
        }
602
0
      }
603
0
    }
604
0
    if(context->bridge && context->bridge->clean_start_local == true){
605
0
      return 2;
606
0
    }
607
0
  }
608
609
0
  if(net__is_connected(context)){
610
0
    if(db__ready_for_flight(context, mosq_md_out, qos)){
611
0
      switch(qos){
612
0
        case 0:
613
0
          state = mosq_ms_publish_qos0;
614
0
          break;
615
0
        case 1:
616
0
          state = mosq_ms_publish_qos1;
617
0
          break;
618
0
        case 2:
619
0
          state = mosq_ms_publish_qos2;
620
0
          break;
621
0
      }
622
0
    }else if(qos != 0 && db__ready_for_queue(context, qos, msg_data)){
623
0
      state = mosq_ms_queued;
624
0
      rc = 2;
625
0
    }else{
626
      /* Dropping message due to full queue. */
627
0
      if(context->is_dropping == false){
628
0
        context->is_dropping = true;
629
0
        log__printf(NULL, MOSQ_LOG_NOTICE,
630
0
            "Outgoing messages are being dropped for client %s.",
631
0
            context->id);
632
0
      }
633
0
      metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1);
634
0
      return 2;
635
0
    }
636
0
  }else{
637
0
    if(db__ready_for_queue(context, qos, msg_data)){
638
0
      state = mosq_ms_queued;
639
0
    }else{
640
0
      metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1);
641
0
      if(context->is_dropping == false){
642
0
        context->is_dropping = true;
643
0
        log__printf(NULL, MOSQ_LOG_NOTICE,
644
0
            "Outgoing messages are being dropped for client %s.",
645
0
            context->id);
646
0
      }
647
0
      return 2;
648
0
    }
649
0
  }
650
0
  assert(state != mosq_ms_invalid);
651
652
0
#ifdef WITH_PERSISTENCE
653
0
  if(state == mosq_ms_queued){
654
0
    db.persistence_changes++;
655
0
  }
656
0
#endif
657
658
0
  client_msg = mosquitto_malloc(sizeof(struct mosquitto__client_msg));
659
0
  if(!client_msg){
660
0
    return MOSQ_ERR_NOMEM;
661
0
  }
662
0
  client_msg->prev = NULL;
663
0
  client_msg->next = NULL;
664
0
  if(cmsg_id){
665
0
    client_msg->data.cmsg_id = cmsg_id;
666
0
  }else{
667
0
    client_msg->data.cmsg_id = ++context->last_cmsg_id;
668
0
  }
669
0
  client_msg->base_msg = base_msg;
670
0
  db__msg_store_ref_inc(client_msg->base_msg);
671
0
  client_msg->data.mid = mid;
672
0
  client_msg->data.direction = mosq_md_out;
673
0
  client_msg->data.state = (enum mosquitto_msg_state)state;
674
0
  client_msg->data.dup = false;
675
0
  if(qos > context->max_qos){
676
0
    client_msg->data.qos = context->max_qos;
677
0
  }else{
678
0
    client_msg->data.qos = qos;
679
0
  }
680
0
  client_msg->data.retain = retain;
681
0
  client_msg->data.subscription_identifier = subscription_identifier;
682
683
0
  if(state == mosq_ms_queued){
684
0
    DL_APPEND(msg_data->queued, client_msg);
685
0
    db__msg_add_to_queued_stats(msg_data, client_msg);
686
0
  }else{
687
0
    DL_APPEND(msg_data->inflight, client_msg);
688
0
    db__msg_add_to_inflight_stats(msg_data, client_msg);
689
0
  }
690
691
0
  if(persist && context->is_persisted){
692
0
    plugin_persist__handle_base_msg_add(client_msg->base_msg);
693
0
    plugin_persist__handle_client_msg_add(context, client_msg);
694
0
  }
695
696
0
  if(db.config->allow_duplicate_messages == false && retain == false){
697
    /* Record which client ids this message has been sent to so we can avoid duplicates.
698
     * Outgoing messages only.
699
     * If retain==true then this is a stale retained message and so should be
700
     * sent regardless. FIXME - this does mean retained messages will received
701
     * multiple times for overlapping subscriptions, although this is only the
702
     * case for SUBSCRIPTION with multiple subs in so is a minor concern.
703
     */
704
0
    dest_ids = mosquitto_realloc(base_msg->dest_ids, sizeof(char *)*(size_t)(base_msg->dest_id_count+1));
705
0
    if(dest_ids){
706
0
      base_msg->dest_ids = dest_ids;
707
0
      base_msg->dest_id_count++;
708
0
      base_msg->dest_ids[base_msg->dest_id_count-1] = mosquitto_strdup(context->id);
709
0
      if(!base_msg->dest_ids[base_msg->dest_id_count-1]){
710
0
        return MOSQ_ERR_NOMEM;
711
0
      }
712
0
    }else{
713
0
      return MOSQ_ERR_NOMEM;
714
0
    }
715
0
  }
716
0
#ifdef WITH_BRIDGE
717
0
  if(context->bridge && context->bridge->start_type == bst_lazy
718
0
      && !net__is_connected(context)
719
0
      && context->msgs_out.inflight_count + context->msgs_out.queued_count >= context->bridge->threshold){
720
721
0
    context->bridge->lazy_reconnect = true;
722
0
  }
723
0
#endif
724
725
0
  if(client_msg->data.qos > 0 && state != mosq_ms_queued){
726
0
    util__decrement_send_quota(context);
727
0
  }
728
729
0
  if(update){
730
0
    rc = db__message_write_inflight_out_latest(context);
731
0
    if(rc){
732
0
      return rc;
733
0
    }
734
0
    rc = db__message_write_queued_out(context);
735
0
    if(rc){
736
0
      return rc;
737
0
    }
738
0
  }
739
740
0
  return rc;
741
0
}
742
743
744
static inline int db__message_update_outgoing_state(struct mosquitto *context, struct mosquitto__client_msg *head,
745
    uint16_t mid, enum mosquitto_msg_state state, int qos, bool persist)
746
0
{
747
0
  struct mosquitto__client_msg *client_msg;
748
749
0
  DL_FOREACH(head, client_msg){
750
0
    if(client_msg->data.mid == mid){
751
0
      if(client_msg->data.qos != qos){
752
0
        return MOSQ_ERR_PROTOCOL;
753
0
      }
754
0
      client_msg->data.state = (enum mosquitto_msg_state)state;
755
0
      if(persist){
756
0
        plugin_persist__handle_client_msg_update(context, client_msg);
757
0
      }
758
0
      return MOSQ_ERR_SUCCESS;
759
0
    }
760
0
  }
761
0
  return MOSQ_ERR_NOT_FOUND;
762
0
}
763
764
765
int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos, bool persist)
766
0
{
767
0
  int rc;
768
769
0
  rc = db__message_update_outgoing_state(context, context->msgs_out.inflight, mid, state, qos, persist);
770
0
  if(!persist && rc == MOSQ_ERR_NOT_FOUND){
771
0
    rc = db__message_update_outgoing_state(context, context->msgs_out.queued, mid, state, qos, persist);
772
0
  }
773
0
  return rc;
774
0
}
775
776
777
static void db__messages_delete_list(struct mosquitto__client_msg **head)
778
0
{
779
0
  struct mosquitto__client_msg *client_msg, *tmp;
780
781
0
  DL_FOREACH_SAFE(*head, client_msg, tmp){
782
0
    DL_DELETE(*head, client_msg);
783
0
    db__msg_store_ref_dec(&client_msg->base_msg);
784
0
    mosquitto_FREE(client_msg);
785
0
  }
786
0
  *head = NULL;
787
0
}
788
789
790
int db__messages_delete_incoming(struct mosquitto *context)
791
0
{
792
0
  if(!context){
793
0
    return MOSQ_ERR_INVAL;
794
0
  }
795
796
0
  db__messages_delete_list(&context->msgs_in.inflight);
797
0
  db__messages_delete_list(&context->msgs_in.queued);
798
0
  context->msgs_in.inflight_bytes = 0;
799
0
  context->msgs_in.inflight_bytes12 = 0;
800
0
  context->msgs_in.inflight_count = 0;
801
0
  context->msgs_in.inflight_count12 = 0;
802
0
  context->msgs_in.queued_bytes = 0;
803
0
  context->msgs_in.queued_bytes12 = 0;
804
0
  context->msgs_in.queued_count = 0;
805
0
  context->msgs_in.queued_count12 = 0;
806
807
0
  return MOSQ_ERR_SUCCESS;
808
0
}
809
810
811
int db__messages_delete_outgoing(struct mosquitto *context)
812
0
{
813
0
  if(!context){
814
0
    return MOSQ_ERR_INVAL;
815
0
  }
816
817
0
  db__messages_delete_list(&context->msgs_out.inflight);
818
0
  db__messages_delete_list(&context->msgs_out.queued);
819
0
  context->msgs_out.inflight_bytes = 0;
820
0
  context->msgs_out.inflight_bytes12 = 0;
821
0
  context->msgs_out.inflight_count = 0;
822
0
  context->msgs_out.inflight_count12 = 0;
823
0
  context->msgs_out.queued_bytes = 0;
824
0
  context->msgs_out.queued_bytes12 = 0;
825
0
  context->msgs_out.queued_count = 0;
826
0
  context->msgs_out.queued_count12 = 0;
827
828
0
  return MOSQ_ERR_SUCCESS;
829
0
}
830
831
832
int db__messages_delete(struct mosquitto *context, bool force_free)
833
0
{
834
0
  if(!context){
835
0
    return MOSQ_ERR_INVAL;
836
0
  }
837
838
0
  if(force_free || context->clean_start || (context->bridge && context->bridge->clean_start)){
839
0
    db__messages_delete_incoming(context);
840
0
  }
841
842
0
  if(force_free || (context->bridge && context->bridge->clean_start_local)
843
0
      || (context->bridge == NULL && context->clean_start)){
844
845
0
    db__messages_delete_outgoing(context);
846
0
  }
847
848
0
  return MOSQ_ERR_SUCCESS;
849
0
}
850
851
852
int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_t qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties)
853
0
{
854
0
  struct mosquitto__base_msg *base_msg;
855
0
  const char *source_id;
856
0
  enum mosquitto_msg_origin origin;
857
858
0
  if(!topic){
859
0
    return MOSQ_ERR_INVAL;
860
0
  }
861
862
0
  base_msg = mosquitto_calloc(1, sizeof(struct mosquitto__base_msg));
863
0
  if(base_msg == NULL){
864
0
    return MOSQ_ERR_NOMEM;
865
0
  }
866
867
0
  base_msg->data.topic = mosquitto_strdup(topic);
868
0
  if(base_msg->data.topic == NULL){
869
0
    db__msg_store_free(base_msg);
870
0
    return MOSQ_ERR_INVAL;
871
0
  }
872
873
0
  base_msg->data.qos = qos;
874
0
  if(db.config->retain_available == false){
875
0
    base_msg->data.retain = 0;
876
0
  }else{
877
0
    base_msg->data.retain = retain;
878
0
  }
879
880
0
  base_msg->data.payloadlen = payloadlen;
881
0
  if(payloadlen > 0){
882
0
    base_msg->data.payload = mosquitto_malloc(base_msg->data.payloadlen+1);
883
0
    if(base_msg->data.payload == NULL){
884
0
      db__msg_store_free(base_msg);
885
0
      return MOSQ_ERR_NOMEM;
886
0
    }
887
    /* Ensure payload is always zero terminated, this is the reason for the extra byte above */
888
0
    ((uint8_t *)base_msg->data.payload)[base_msg->data.payloadlen] = 0;
889
0
    memcpy(base_msg->data.payload, payload, base_msg->data.payloadlen);
890
0
  }
891
892
0
  if(context && context->id){
893
0
    source_id = context->id;
894
0
  }else{
895
0
    source_id = "";
896
0
  }
897
0
  if(properties){
898
0
    base_msg->data.properties = *properties;
899
0
    *properties = NULL;
900
0
  }
901
902
0
  if(context){
903
0
    origin = mosq_mo_client;
904
0
  }else{
905
0
    origin = mosq_mo_broker;
906
0
  }
907
0
  if(db__message_store(context, base_msg, &message_expiry_interval, origin)){
908
0
    return 1;
909
0
  }
910
911
0
  return sub__messages_queue(source_id, base_msg->data.topic, base_msg->data.qos, base_msg->data.retain, &base_msg);
912
0
}
913
914
915
0
#define MOSQ_UUID_EPOCH 1637168273
916
917
918
/* db__new_msg_id() attempts to generate a new unique id on the broker, or a
919
 * number of brokers. It uses the 10-bit node ID, which can be set by plugins
920
 * to allow different brokers to share the same plugin persistence database
921
 * without overlapping one another.
922
 *
923
 * The message ID is a 64-bit unsigned integer arranged as follows:
924
 *
925
 * 10-bit ID  31-bit seconds                 23-bit fractional seconds
926
 * iiiiiiiiiisssssssssssssssssssssssssssssssnnnnnnnnnnnnnnnnnnnnnnn
927
 *
928
 * 10-bit ID gives a total of 1024 brokers can produce unique values (complete overkill)
929
 * 31-bit seconds gives a roll over date of 68 years after MOSQ_UUID_EPOCH - 2089.
930
 *    This roll over date would affect messages that have been queued waiting
931
 *    for a client to receive them, or retained messages only. If either of
932
 *    those remains for 68 years unchanged, then there will potentially be a
933
 *    collision. Ideally we need to ensure, however, that the message id is
934
 *    continually increasing for sorting purposes.
935
 * 23-bit fractional seconds gives a resolution of 120ns, or 8.4 million
936
 *    messages per second per broker.
937
 */
938
uint64_t db__new_msg_id(void)
939
0
{
940
#ifdef WIN32
941
  FILETIME ftime;
942
  uint64_t ftime64;
943
#else
944
0
  struct timespec ts;
945
0
#endif
946
0
  uint64_t id;
947
0
  uint64_t tmp;
948
0
  time_t sec;
949
0
  long nsec;
950
951
0
  id = db.node_id_shifted; /* Top 10-bits */
952
953
#ifdef WIN32
954
  GetSystemTimePreciseAsFileTime(&ftime);
955
  ftime64 = (((uint64_t)ftime.dwHighDateTime)<<32) + ftime.dwLowDateTime;
956
  tmp = ftime64 - 116444736000000000LL; /* Convert offset to unix epoch, still in counts of 100ns */
957
  sec = tmp / 10000000; /* Convert to seconds */
958
  nsec = (long)(tmp - sec)*100; /* Remove seconds, convert to counts of 1ns */
959
#else
960
0
  clock_gettime(CLOCK_REALTIME, &ts);
961
0
  sec = ts.tv_sec;
962
0
  nsec = ts.tv_nsec;
963
0
#endif
964
0
  tmp = (sec - MOSQ_UUID_EPOCH) & 0x7FFFFFFF;
965
0
  id = id | (tmp << 23); /* Seconds, 31-bits (68 years) */
966
967
0
  tmp = (nsec & 0x7FFFFF80); /* top 23-bits of the bottom 30 bits (1 billion ns), ~100 ns resolution */
968
0
  id = id | (tmp >> 7);
969
970
0
  if(id <= db.last_db_id){
971
0
    id = db.last_db_id + 1;
972
0
  }
973
0
  db.last_db_id = id;
974
975
0
  return id;
976
0
}
977
978
979
/* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload and properties. */
980
int db__message_store(const struct mosquitto *source, struct mosquitto__base_msg *base_msg, uint32_t *message_expiry_interval, enum mosquitto_msg_origin origin)
981
0
{
982
0
  int rc;
983
984
0
  assert(base_msg);
985
986
0
  if(source && source->id){
987
0
    base_msg->data.source_id = mosquitto_strdup(source->id);
988
0
  }else{
989
0
    base_msg->data.source_id = mosquitto_strdup("");
990
0
  }
991
0
  if(!base_msg->data.source_id){
992
0
    log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
993
0
    db__msg_store_free(base_msg);
994
0
    return MOSQ_ERR_NOMEM;
995
0
  }
996
997
0
  if(source && source->username){
998
0
    base_msg->data.source_username = mosquitto_strdup(source->username);
999
0
    if(!base_msg->data.source_username){
1000
0
      db__msg_store_free(base_msg);
1001
0
      return MOSQ_ERR_NOMEM;
1002
0
    }
1003
0
  }
1004
0
  if(source){
1005
0
    base_msg->source_listener = source->listener;
1006
0
  }
1007
0
  base_msg->origin = origin;
1008
0
  if(message_expiry_interval){
1009
0
    if(*message_expiry_interval > 0 && *message_expiry_interval != MSG_EXPIRY_INFINITE){
1010
0
      base_msg->data.expiry_time = db.now_real_s + *message_expiry_interval;
1011
0
    }else{
1012
0
      base_msg->data.expiry_time = 0;
1013
0
    }
1014
0
  }
1015
1016
0
  base_msg->dest_ids = NULL;
1017
0
  base_msg->dest_id_count = 0;
1018
0
  db.msg_store_count++;
1019
0
  db.msg_store_bytes += base_msg->data.payloadlen;
1020
1021
0
  if(!base_msg->data.store_id){
1022
0
    base_msg->data.store_id = db__new_msg_id();
1023
0
  }
1024
1025
0
  rc = db__msg_store_add(base_msg);
1026
0
  if(rc){
1027
0
    db__msg_store_free(base_msg);
1028
0
    return rc;
1029
0
  }
1030
1031
0
  return MOSQ_ERR_SUCCESS;
1032
0
}
1033
1034
1035
int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto__client_msg **client_msg)
1036
0
{
1037
0
  struct mosquitto__client_msg *cmsg;
1038
1039
0
  *client_msg = NULL;
1040
1041
0
  if(!context){
1042
0
    return MOSQ_ERR_INVAL;
1043
0
  }
1044
1045
0
  DL_FOREACH(context->msgs_in.inflight, cmsg){
1046
0
    if(cmsg->base_msg->data.source_mid == mid){
1047
0
      *client_msg = cmsg;
1048
0
      return MOSQ_ERR_SUCCESS;
1049
0
    }
1050
0
  }
1051
1052
0
  DL_FOREACH(context->msgs_in.queued, cmsg){
1053
0
    if(cmsg->base_msg->data.source_mid == mid){
1054
0
      *client_msg = cmsg;
1055
0
      return MOSQ_ERR_SUCCESS;
1056
0
    }
1057
0
  }
1058
1059
0
  return 1;
1060
0
}
1061
1062
1063
/* Called on reconnect to set outgoing messages to a sensible state and force a
1064
 * retry, and to set incoming messages to expect an appropriate retry. */
1065
static int db__message_reconnect_reset_outgoing(struct mosquitto *context)
1066
0
{
1067
0
  struct mosquitto__client_msg *client_msg, *tmp;
1068
1069
0
  context->msgs_out.inflight_bytes = 0;
1070
0
  context->msgs_out.inflight_bytes12 = 0;
1071
0
  context->msgs_out.inflight_count = 0;
1072
0
  context->msgs_out.inflight_count12 = 0;
1073
0
  context->msgs_out.queued_bytes = 0;
1074
0
  context->msgs_out.queued_bytes12 = 0;
1075
0
  context->msgs_out.queued_count = 0;
1076
0
  context->msgs_out.queued_count12 = 0;
1077
0
  context->msgs_out.inflight_quota = context->msgs_out.inflight_maximum;
1078
1079
0
  DL_FOREACH_SAFE(context->msgs_out.inflight, client_msg, tmp){
1080
0
    db__msg_add_to_inflight_stats(&context->msgs_out, client_msg);
1081
0
    if(client_msg->data.qos > 0){
1082
0
      util__decrement_send_quota(context);
1083
0
    }
1084
1085
0
    switch(client_msg->data.qos){
1086
0
      case 0:
1087
0
        client_msg->data.state = mosq_ms_publish_qos0;
1088
0
        break;
1089
0
      case 1:
1090
0
        client_msg->data.state = mosq_ms_publish_qos1;
1091
0
        break;
1092
0
      case 2:
1093
0
        if(client_msg->data.state == mosq_ms_wait_for_pubcomp){
1094
0
          client_msg->data.state = mosq_ms_resend_pubrel;
1095
0
        }else{
1096
0
          client_msg->data.state = mosq_ms_publish_qos2;
1097
0
        }
1098
0
        break;
1099
0
    }
1100
0
    plugin_persist__handle_client_msg_update(context, client_msg);
1101
0
  }
1102
  /* Messages received when the client was disconnected are put
1103
   * in the mosq_ms_queued state. If we don't change them to the
1104
   * appropriate "publish" state, then the queued messages won't
1105
   * get sent until the client next receives a message - and they
1106
   * will be sent out of order.
1107
   */
1108
0
  DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){
1109
0
    db__msg_add_to_queued_stats(&context->msgs_out, client_msg);
1110
0
  }
1111
0
  db__fill_inflight_out_from_queue(context);
1112
1113
0
  return MOSQ_ERR_SUCCESS;
1114
0
}
1115
1116
1117
/* Called on reconnect to set incoming messages to expect an appropriate retry. */
1118
static int db__message_reconnect_reset_incoming(struct mosquitto *context)
1119
0
{
1120
0
  struct mosquitto__client_msg *client_msg, *tmp;
1121
1122
0
  context->msgs_in.inflight_bytes = 0;
1123
0
  context->msgs_in.inflight_bytes12 = 0;
1124
0
  context->msgs_in.inflight_count = 0;
1125
0
  context->msgs_in.inflight_count12 = 0;
1126
0
  context->msgs_in.queued_bytes = 0;
1127
0
  context->msgs_in.queued_bytes12 = 0;
1128
0
  context->msgs_in.queued_count = 0;
1129
0
  context->msgs_in.queued_count12 = 0;
1130
0
  context->msgs_in.inflight_quota = context->msgs_in.inflight_maximum;
1131
1132
0
  DL_FOREACH_SAFE(context->msgs_in.inflight, client_msg, tmp){
1133
0
    db__msg_add_to_inflight_stats(&context->msgs_in, client_msg);
1134
0
    if(client_msg->data.qos > 0){
1135
0
      util__decrement_receive_quota(context);
1136
0
    }
1137
1138
0
    if(client_msg->data.qos != 2){
1139
      /* Anything <QoS 2 can be completely retried by the client at
1140
       * no harm. */
1141
0
      db__message_remove_inflight(context, &context->msgs_in, client_msg);
1142
0
    }else{
1143
      /* Message state can be preserved here because it should match
1144
       * whatever the client has got. */
1145
0
      client_msg->data.dup = 0;
1146
0
    }
1147
0
  }
1148
1149
  /* Messages received when the client was disconnected are put
1150
   * in the mosq_ms_queued state. If we don't change them to the
1151
   * appropriate "publish" state, then the queued messages won't
1152
   * get sent until the client next receives a message - and they
1153
   * will be sent out of order.
1154
   */
1155
0
  DL_FOREACH_SAFE(context->msgs_in.queued, client_msg, tmp){
1156
0
    client_msg->data.dup = 0;
1157
0
    db__msg_add_to_queued_stats(&context->msgs_in, client_msg);
1158
0
    if(db__ready_for_flight(context, mosq_md_in, client_msg->data.qos)){
1159
0
      switch(client_msg->data.qos){
1160
0
        case 0:
1161
0
          client_msg->data.state = mosq_ms_publish_qos0;
1162
0
          break;
1163
0
        case 1:
1164
0
          client_msg->data.state = mosq_ms_publish_qos1;
1165
0
          break;
1166
0
        case 2:
1167
0
          client_msg->data.state = mosq_ms_publish_qos2;
1168
0
          break;
1169
0
      }
1170
0
      db__message_dequeue_first(context, &context->msgs_in);
1171
0
      plugin_persist__handle_client_msg_update(context, client_msg);
1172
0
    }
1173
0
  }
1174
1175
0
  return MOSQ_ERR_SUCCESS;
1176
0
}
1177
1178
1179
int db__message_reconnect_reset(struct mosquitto *context)
1180
0
{
1181
0
  int rc;
1182
1183
0
  rc = db__message_reconnect_reset_outgoing(context);
1184
0
  if(rc){
1185
0
    return rc;
1186
0
  }
1187
0
  return db__message_reconnect_reset_incoming(context);
1188
0
}
1189
1190
1191
int db__message_remove_incoming(struct mosquitto *context, uint16_t mid)
1192
0
{
1193
0
  struct mosquitto__client_msg *client_msg, *tmp;
1194
1195
0
  if(!context){
1196
0
    return MOSQ_ERR_INVAL;
1197
0
  }
1198
1199
0
  DL_FOREACH_SAFE(context->msgs_in.inflight, client_msg, tmp){
1200
0
    if(client_msg->data.mid == mid){
1201
0
      if(client_msg->base_msg->data.qos != 2){
1202
0
        return MOSQ_ERR_PROTOCOL;
1203
0
      }
1204
0
      db__message_remove_inflight(context, &context->msgs_in, client_msg);
1205
0
      return MOSQ_ERR_SUCCESS;
1206
0
    }
1207
0
  }
1208
1209
0
  return MOSQ_ERR_NOT_FOUND;
1210
0
}
1211
1212
1213
int db__message_release_incoming(struct mosquitto *context, uint16_t mid)
1214
0
{
1215
0
  struct mosquitto__client_msg *client_msg, *tmp;
1216
0
  int retain;
1217
0
  char *topic;
1218
0
  char *source_id;
1219
0
  bool deleted = false;
1220
0
  int rc;
1221
1222
0
  if(!context){
1223
0
    return MOSQ_ERR_INVAL;
1224
0
  }
1225
1226
0
  DL_FOREACH_SAFE(context->msgs_in.inflight, client_msg, tmp){
1227
0
    if(client_msg->data.mid == mid){
1228
0
      if(client_msg->base_msg->data.qos != 2){
1229
0
        return MOSQ_ERR_PROTOCOL;
1230
0
      }
1231
0
      topic = client_msg->base_msg->data.topic;
1232
0
      retain = client_msg->data.retain;
1233
0
      source_id = client_msg->base_msg->data.source_id;
1234
1235
      /* topic==NULL should be a QoS 2 message that was
1236
       * denied/dropped and is being processed so the client doesn't
1237
       * keep resending it. That means we don't send it to other
1238
       * clients. */
1239
0
      if(topic == NULL){
1240
0
        db__message_remove_inflight(context, &context->msgs_in, client_msg);
1241
0
        deleted = true;
1242
0
      }else{
1243
0
        rc = sub__messages_queue(source_id, topic, 2, retain, &client_msg->base_msg);
1244
0
        if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_NO_SUBSCRIBERS){
1245
0
          db__message_remove_inflight(context, &context->msgs_in, client_msg);
1246
0
          deleted = true;
1247
0
        }else{
1248
0
          return 1;
1249
0
        }
1250
0
      }
1251
0
    }
1252
0
  }
1253
1254
0
  DL_FOREACH_SAFE(context->msgs_in.queued, client_msg, tmp){
1255
0
    if(db__ready_for_flight(context, mosq_md_in, client_msg->data.qos)){
1256
0
      break;
1257
0
    }
1258
1259
0
    if(client_msg->data.qos == 2){
1260
0
      send__pubrec(context, client_msg->data.mid, 0, NULL);
1261
0
      client_msg->data.state = mosq_ms_wait_for_pubrel;
1262
0
      db__message_dequeue_first(context, &context->msgs_in);
1263
0
      plugin_persist__handle_client_msg_update(context, client_msg);
1264
0
    }
1265
0
  }
1266
0
  if(deleted){
1267
0
    return MOSQ_ERR_SUCCESS;
1268
0
  }else{
1269
0
    return MOSQ_ERR_NOT_FOUND;
1270
0
  }
1271
0
}
1272
1273
1274
void db__expire_all_messages(struct mosquitto *context)
1275
0
{
1276
0
  struct mosquitto__client_msg *client_msg, *tmp;
1277
1278
0
  DL_FOREACH_SAFE(context->msgs_out.inflight, client_msg, tmp){
1279
0
    if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){
1280
0
      if(client_msg->data.qos > 0){
1281
0
        util__increment_send_quota(context);
1282
0
      }
1283
0
      db__message_remove_inflight(context, &context->msgs_out, client_msg);
1284
0
    }
1285
0
  }
1286
0
  db__fill_inflight_out_from_queue(context);
1287
0
  DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){
1288
0
    if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){
1289
0
      db__message_remove_queued(context, &context->msgs_out, client_msg);
1290
0
    }
1291
0
  }
1292
0
  DL_FOREACH_SAFE(context->msgs_in.inflight, client_msg, tmp){
1293
0
    if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){
1294
0
      if(client_msg->data.qos > 0){
1295
0
        util__increment_receive_quota(context);
1296
0
      }
1297
0
      db__message_remove_inflight(context, &context->msgs_in, client_msg);
1298
0
    }
1299
0
  }
1300
0
  DL_FOREACH_SAFE(context->msgs_in.queued, client_msg, tmp){
1301
0
    if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){
1302
0
      db__message_remove_queued(context, &context->msgs_in, client_msg);
1303
0
    }
1304
0
  }
1305
0
}
1306
1307
1308
static void db__client_messages_check_acl(struct mosquitto *context, struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg **head,
1309
    void (*decrement_stats_fn)(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg))
1310
0
{
1311
0
  struct mosquitto__client_msg *client_msg, *tmp;
1312
0
  struct mosquitto__base_msg *base_msg;
1313
0
  int access;
1314
1315
0
  DL_FOREACH_SAFE((*head), client_msg, tmp){
1316
0
    base_msg = client_msg->base_msg;
1317
0
    if(client_msg->data.direction == mosq_md_out){
1318
0
      access = MOSQ_ACL_READ;
1319
0
    }else{
1320
0
      access = MOSQ_ACL_WRITE;
1321
0
    }
1322
0
    if(mosquitto_acl_check(context, base_msg->data.topic,
1323
0
        base_msg->data.payloadlen, base_msg->data.payload,
1324
0
        base_msg->data.qos, base_msg->data.retain, access) != MOSQ_ERR_SUCCESS){
1325
1326
0
      DL_DELETE((*head), client_msg);
1327
0
      decrement_stats_fn(msg_data, client_msg);
1328
0
      plugin_persist__handle_client_msg_delete(context, client_msg);
1329
0
      db__msg_store_ref_dec(&client_msg->base_msg);
1330
0
      mosquitto_FREE(client_msg);
1331
0
    }
1332
0
  }
1333
0
}
1334
1335
1336
void db__check_acl_of_all_messages(struct mosquitto *context)
1337
0
{
1338
0
  db__client_messages_check_acl(context, &context->msgs_in, &context->msgs_in.inflight, &db__msg_remove_from_inflight_stats);
1339
0
  db__client_messages_check_acl(context, &context->msgs_in, &context->msgs_in.queued, &db__msg_remove_from_queued_stats);
1340
0
  db__client_messages_check_acl(context, &context->msgs_out, &context->msgs_out.inflight, &db__msg_remove_from_inflight_stats);
1341
0
  db__client_messages_check_acl(context, &context->msgs_out, &context->msgs_out.queued, &db__msg_remove_from_queued_stats);
1342
0
}
1343
1344
1345
static int db__message_write_inflight_out_single(struct mosquitto *context, struct mosquitto__client_msg *client_msg)
1346
0
{
1347
0
  struct mosquitto__base_msg *base_msg;
1348
0
  mosquitto_property *base_msg_props = NULL;
1349
0
  int rc;
1350
0
  uint16_t mid;
1351
0
  int retries;
1352
0
  int retain;
1353
0
  const char *topic;
1354
0
  uint8_t qos;
1355
0
  uint32_t payloadlen;
1356
0
  const void *payload;
1357
0
  uint32_t expiry_interval;
1358
0
  uint32_t subscription_id;
1359
1360
0
  base_msg = client_msg->base_msg;
1361
1362
0
  expiry_interval = 0;
1363
0
  if(base_msg->data.expiry_time){
1364
0
    if(db.now_real_s > base_msg->data.expiry_time){
1365
      /* Message is expired, must not send. */
1366
0
      if(client_msg->data.direction == mosq_md_out && client_msg->data.qos > 0){
1367
0
        util__increment_send_quota(context);
1368
0
      }
1369
0
      db__message_remove_inflight(context, &context->msgs_out, client_msg);
1370
0
      db__fill_inflight_out_from_queue(context);
1371
0
      return MOSQ_ERR_SUCCESS;
1372
0
    }else{
1373
0
      expiry_interval = (uint32_t)(base_msg->data.expiry_time - db.now_real_s);
1374
0
    }
1375
0
  }
1376
0
  mid = client_msg->data.mid;
1377
0
  retries = client_msg->data.dup;
1378
0
  retain = client_msg->data.retain;
1379
0
  topic = base_msg->data.topic;
1380
0
  qos = (uint8_t)client_msg->data.qos;
1381
0
  payloadlen = base_msg->data.payloadlen;
1382
0
  payload = base_msg->data.payload;
1383
0
  subscription_id = client_msg->data.subscription_identifier;
1384
0
  base_msg_props = base_msg->data.properties;
1385
1386
0
  switch(client_msg->data.state){
1387
0
    case mosq_ms_publish_qos0:
1388
0
      rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, subscription_id, base_msg_props, expiry_interval);
1389
0
      if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){
1390
0
        db__message_remove_inflight(context, &context->msgs_out, client_msg);
1391
0
      }else{
1392
0
        return rc;
1393
0
      }
1394
0
      break;
1395
1396
0
    case mosq_ms_publish_qos1:
1397
0
      rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, subscription_id, base_msg_props, expiry_interval);
1398
0
      if(rc == MOSQ_ERR_SUCCESS){
1399
0
        client_msg->data.dup = 1; /* Any retry attempts are a duplicate. */
1400
0
        client_msg->data.state = mosq_ms_wait_for_puback;
1401
0
        plugin_persist__handle_client_msg_update(context, client_msg);
1402
0
      }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
1403
0
        db__message_remove_inflight(context, &context->msgs_out, client_msg);
1404
0
      }else{
1405
0
        return rc;
1406
0
      }
1407
0
      break;
1408
1409
0
    case mosq_ms_publish_qos2:
1410
0
      rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, subscription_id, base_msg_props, expiry_interval);
1411
0
      if(rc == MOSQ_ERR_SUCCESS){
1412
0
        client_msg->data.dup = 1; /* Any retry attempts are a duplicate. */
1413
0
        client_msg->data.state = mosq_ms_wait_for_pubrec;
1414
0
        plugin_persist__handle_client_msg_update(context, client_msg);
1415
0
      }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
1416
0
        db__message_remove_inflight(context, &context->msgs_out, client_msg);
1417
0
      }else{
1418
0
        return rc;
1419
0
      }
1420
0
      break;
1421
1422
0
    case mosq_ms_resend_pubrel:
1423
0
      rc = send__pubrel(context, mid, NULL);
1424
0
      if(!rc){
1425
0
        client_msg->data.state = mosq_ms_wait_for_pubcomp;
1426
0
        plugin_persist__handle_client_msg_update(context, client_msg);
1427
0
      }else{
1428
0
        return rc;
1429
0
      }
1430
0
      break;
1431
1432
0
    case mosq_ms_invalid:
1433
0
    case mosq_ms_send_pubrec:
1434
0
    case mosq_ms_resend_pubcomp:
1435
0
    case mosq_ms_wait_for_puback:
1436
0
    case mosq_ms_wait_for_pubrec:
1437
0
    case mosq_ms_wait_for_pubrel:
1438
0
    case mosq_ms_wait_for_pubcomp:
1439
0
    case mosq_ms_queued:
1440
0
      break;
1441
0
  }
1442
0
  return MOSQ_ERR_SUCCESS;
1443
0
}
1444
1445
1446
int db__message_write_inflight_out_all(struct mosquitto *context)
1447
0
{
1448
0
  struct mosquitto__client_msg *client_msg, *tmp;
1449
0
  int rc;
1450
1451
0
  if(context->state != mosq_cs_active || !net__is_connected(context)){
1452
0
    return MOSQ_ERR_SUCCESS;
1453
0
  }
1454
1455
0
  DL_FOREACH_SAFE(context->msgs_out.inflight, client_msg, tmp){
1456
0
    rc = db__message_write_inflight_out_single(context, client_msg);
1457
0
    if(rc){
1458
0
      return rc;
1459
0
    }
1460
0
  }
1461
0
  return MOSQ_ERR_SUCCESS;
1462
0
}
1463
1464
1465
int db__message_write_inflight_out_latest(struct mosquitto *context)
1466
0
{
1467
0
  struct mosquitto__client_msg *client_msg, *next;
1468
0
  int rc;
1469
1470
0
  if(context->state != mosq_cs_active
1471
0
      || !net__is_connected(context)
1472
0
      || context->msgs_out.inflight == NULL){
1473
1474
0
    return MOSQ_ERR_SUCCESS;
1475
0
  }
1476
1477
0
  if(context->msgs_out.inflight->prev == context->msgs_out.inflight){
1478
    /* Only one message */
1479
0
    return db__message_write_inflight_out_single(context, context->msgs_out.inflight);
1480
0
  }
1481
1482
  /* Start at the end of the list and work backwards looking for the first
1483
   * message in a non-publish state */
1484
0
  client_msg = context->msgs_out.inflight->prev;
1485
0
  while(client_msg != context->msgs_out.inflight &&
1486
0
      (client_msg->data.state == mosq_ms_publish_qos0
1487
0
      || client_msg->data.state == mosq_ms_publish_qos1
1488
0
      || client_msg->data.state == mosq_ms_publish_qos2)){
1489
1490
0
    client_msg = client_msg->prev;
1491
0
  }
1492
1493
  /* Tail is now either the head of the list, if that message is waiting for
1494
   * publish, or the oldest message not waiting for a publish. In the latter
1495
   * case, any pending publishes should be next after this message. */
1496
0
  if(client_msg != context->msgs_out.inflight){
1497
0
    client_msg = client_msg->next;
1498
0
  }
1499
1500
0
  while(client_msg){
1501
0
    next = client_msg->next;
1502
0
    rc = db__message_write_inflight_out_single(context, client_msg);
1503
0
    if(rc){
1504
0
      return rc;
1505
0
    }
1506
0
    client_msg = next;
1507
0
  }
1508
0
  return MOSQ_ERR_SUCCESS;
1509
0
}
1510
1511
1512
int db__message_write_queued_in(struct mosquitto *context)
1513
0
{
1514
0
  struct mosquitto__client_msg *client_msg, *tmp;
1515
0
  int rc;
1516
1517
0
  if(context->state != mosq_cs_active){
1518
0
    return MOSQ_ERR_SUCCESS;
1519
0
  }
1520
1521
0
  DL_FOREACH_SAFE(context->msgs_in.queued, client_msg, tmp){
1522
0
    if(context->msgs_in.inflight_maximum != 0 && context->msgs_in.inflight_quota == 0){
1523
0
      break;
1524
0
    }
1525
1526
0
    if(client_msg->data.qos == 2){
1527
0
      client_msg->data.state = mosq_ms_send_pubrec;
1528
0
      db__message_dequeue_first(context, &context->msgs_in);
1529
0
      rc = send__pubrec(context, client_msg->data.mid, 0, NULL);
1530
0
      if(!rc){
1531
0
        client_msg->data.state = mosq_ms_wait_for_pubrel;
1532
0
        plugin_persist__handle_client_msg_update(context, client_msg);
1533
0
      }else{
1534
0
        plugin_persist__handle_client_msg_update(context, client_msg);
1535
0
        return rc;
1536
0
      }
1537
0
    }
1538
0
  }
1539
0
  return MOSQ_ERR_SUCCESS;
1540
0
}
1541
1542
1543
int db__message_write_queued_out(struct mosquitto *context)
1544
0
{
1545
0
  if(context->state != mosq_cs_active){
1546
0
    return MOSQ_ERR_SUCCESS;
1547
0
  }
1548
1549
0
  db__fill_inflight_out_from_queue(context);
1550
1551
0
  return MOSQ_ERR_SUCCESS;
1552
0
}