Coverage Report

Created: 2025-10-24 06:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/mosquitto/src/keepalive.c
Line
Count
Source
1
/*
2
Copyright (c) 2009-2021 Roger Light <roger@atchoo.org>
3
4
All rights reserved. This program and the accompanying materials
5
are made available under the terms of the Eclipse Public License 2.0
6
and Eclipse Distribution License v1.0 which accompany this distribution.
7
8
The Eclipse Public License is available at
9
   https://www.eclipse.org/legal/epl-2.0/
10
and the Eclipse Distribution License is available at
11
  http://www.eclipse.org/org/documents/edl-v10.php.
12
13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15
Contributors:
16
   Roger Light - initial implementation and documentation.
17
*/
18
19
#include "config.h"
20
#include <time.h>
21
#include "mosquitto_broker_internal.h"
22
#include <utlist.h>
23
24
25
/* This contains code for checking whether clients have exceeded their keepalive timeouts.
26
 * There are two versions.
27
 *
28
 * The old version can be used by compiling with `make WITH_OLD_KEEPALIVE=yes`.
29
 * It will scan the entire list of connected clients every 5 seconds to see if
30
 * they have expired. Hence it scales with O(n) and with e.g. 60000 clients can
31
 * have a measurable effect on CPU usage in the low single digit percent range.
32
 *
33
 * The new version scales with O(1). It uses a ring buffer that contains
34
 * max_keepalive*1.5+1 entries. The current time in integer seconds, modulus
35
 * the number of entries, points to the head of the ring buffer. Any clients
36
 * will appear after this point at the position indexed by the time at which
37
 * they will expire if they do not send another message, assuming they do not
38
 * have keepalive==0 - in which case they are not part of this check. So a
39
 * client that connects with keepalive=60 will be added at `now + 60*1.5`.
40
 *
41
 * A client is added to an entry with a doubly linked list. When the client
42
 * sends a new message, it is removed from the old position and added to the
43
 * new.
44
 *
45
 * As time moves on, if the linked list at the current entry is not empty, all
46
 * of the clients are expired.
47
 *
48
 * The ring buffer size is determined by max_keepalive. At the default, it is
49
 * 65535*1.5+1=98303 entries long. On a 64-bit machine that is 786424 bytes.
50
 * If this is too big a burden and you do not need many clients connected, then
51
 * the old check is sufficient. You can reduce the number of entries by setting
52
 * a lower max_keepalive value. A value as low as 600 still gives a 10 minute
53
 * keepalive and reduces the memory for the ring buffer to 7208 bytes.
54
 *
55
 * *NOTE* It is likely that the old check routine will be removed in the
56
 * future, and max_keepalive set to a sensible default value. If this is a
57
 * problem for you please get in touch.
58
 */
59
60
static time_t last_keepalive_check = 0;
61
#ifndef WITH_OLD_KEEPALIVE
62
static int keepalive_list_max = 0;
63
static struct mosquitto **keepalive_list = NULL;
64
#endif
65
66
#ifndef WITH_OLD_KEEPALIVE
67
68
69
static int calc_index(struct mosquitto *context)
70
0
{
71
0
  return (int)(context->last_msg_in + context->keepalive*3/2) % keepalive_list_max;
72
0
}
73
#endif
74
75
76
int keepalive__init(void)
77
0
{
78
0
#ifndef WITH_OLD_KEEPALIVE
79
0
  struct mosquitto *context, *ctxt_tmp;
80
81
0
  if(db.config->max_keepalive <= 0){
82
0
    keepalive_list_max = (UINT16_MAX * 3)/2 + 1;
83
0
  }else{
84
0
    keepalive_list_max = (db.config->max_keepalive * 3)/2 + 1;
85
0
  }
86
0
  keepalive_list = mosquitto_calloc((size_t)keepalive_list_max, sizeof(struct mosquitto *));
87
0
  if(keepalive_list == NULL){
88
0
    log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
89
0
    keepalive_list_max = 0;
90
0
    return MOSQ_ERR_NOMEM;
91
0
  }
92
93
  /* Add existing clients - should only be applicable on MOSQ_EVT_RELOAD */
94
0
  HASH_ITER(hh_sock, db.contexts_by_sock, context, ctxt_tmp){
95
0
    if(net__is_connected(context) && !context->bridge && context->keepalive > 0){
96
0
      keepalive__add(context);
97
0
    }
98
0
  }
99
0
#endif
100
0
  last_keepalive_check = db.now_s;
101
0
  return MOSQ_ERR_SUCCESS;
102
0
}
103
104
105
void keepalive__cleanup(void)
106
0
{
107
0
#ifndef WITH_OLD_KEEPALIVE
108
0
  for(int idx=0; idx<keepalive_list_max; idx++){
109
0
    struct mosquitto *context, *ctxt_tmp;
110
0
    DL_FOREACH_SAFE2(keepalive_list[idx], context, ctxt_tmp, keepalive_next){
111
0
      DL_DELETE2(keepalive_list[idx], context, keepalive_prev, keepalive_next);
112
0
    }
113
0
  }
114
0
  mosquitto_free(keepalive_list);
115
0
  keepalive_list = NULL;
116
0
  keepalive_list_max = 0;
117
0
#endif
118
0
}
119
120
121
int keepalive__add(struct mosquitto *context)
122
0
{
123
0
#ifndef WITH_OLD_KEEPALIVE
124
0
  if(context->keepalive <= 0 || !net__is_connected(context)){
125
0
    return MOSQ_ERR_SUCCESS;
126
0
  }
127
0
#ifdef WITH_BRIDGE
128
0
  if(context->bridge){
129
0
    return MOSQ_ERR_SUCCESS;
130
0
  }
131
0
#endif
132
133
0
  DL_APPEND2(keepalive_list[calc_index(context)], context, keepalive_prev, keepalive_next);
134
0
  context->keepalive_add_time = db.now_s;
135
#else
136
  UNUSED(context);
137
#endif
138
0
  return MOSQ_ERR_SUCCESS;
139
0
}
140
141
142
#ifndef WITH_OLD_KEEPALIVE
143
144
145
void keepalive__check(void)
146
0
{
147
0
  struct mosquitto *context, *ctxt_tmp;
148
0
  time_t timeout;
149
150
0
  if(db.contexts_by_sock){
151
    /* Check the next 5 seconds for upcoming expiries */
152
    /* FIXME - find the actual next entry without having to iterate over
153
     * the whole list */
154
0
    timeout = 5;
155
0
    for(time_t i=5; i>0; i--){
156
0
      if(keepalive_list[(db.now_s + i) % keepalive_list_max]){
157
0
        timeout = i;
158
0
      }
159
0
    }
160
0
    loop__update_next_event(timeout*1000);
161
0
  }
162
0
  for(time_t i=last_keepalive_check; i<db.now_s; i++){
163
0
    int idx = (int)(i % keepalive_list_max);
164
0
    if(keepalive_list[idx]){
165
0
      DL_FOREACH_SAFE2(keepalive_list[idx], context, ctxt_tmp, keepalive_next){
166
        /* keepalive_add_time lets us account for the client adding itself to the keepalive
167
         * list when its last_msg_in value is greater than the last_keepalive_check.
168
         * Without this, the client would be expired if it has keepalive == max_keepalive.
169
         */
170
0
        if(context->keepalive_add_time <= last_keepalive_check && net__is_connected(context)){
171
          /* Client has exceeded keepalive*1.5 */
172
0
          do_disconnect(context, MOSQ_ERR_KEEPALIVE);
173
0
        }
174
0
      }
175
0
    }
176
0
  }
177
178
0
  last_keepalive_check = db.now_s;
179
0
}
180
#else
181
182
183
void keepalive__check(void)
184
{
185
  struct mosquitto *context, *ctxt_tmp;
186
  time_t timeout;
187
188
  if(db.contexts_by_sock){
189
    timeout = (last_keepalive_check + 5 - db.now_s);
190
    if(timeout <= 0){
191
      timeout = 5;
192
    }
193
    loop__update_next_event(timeout*1000);
194
  }
195
  if(last_keepalive_check + 5 <= db.now_s){
196
    last_keepalive_check = db.now_s;
197
198
    HASH_ITER(hh_sock, db.contexts_by_sock, context, ctxt_tmp){
199
      if(net__is_connected(context)){
200
        /* Local bridges never time out in this fashion. */
201
        if(!(context->keepalive)
202
            || context->bridge
203
            || db.now_s - context->last_msg_in <= (time_t)(context->keepalive)*3/2){
204
205
        }else{
206
          /* Client has exceeded keepalive*1.5 */
207
          do_disconnect(context, MOSQ_ERR_KEEPALIVE);
208
        }
209
      }
210
    }
211
  }
212
}
213
#endif
214
215
216
int keepalive__remove(struct mosquitto *context)
217
1.29k
{
218
1.29k
#ifndef WITH_OLD_KEEPALIVE
219
1.29k
  int idx;
220
221
1.29k
  if(context->keepalive <= 0 || context->keepalive_prev == NULL){
222
1.29k
    return MOSQ_ERR_SUCCESS;
223
1.29k
  }
224
225
0
  idx = calc_index(context);
226
0
  if(keepalive_list[idx]){
227
0
    DL_DELETE2(keepalive_list[idx], context, keepalive_prev, keepalive_next);
228
0
    context->keepalive_next = NULL;
229
0
    context->keepalive_prev = NULL;
230
0
  }
231
#else
232
  UNUSED(context);
233
#endif
234
0
  return MOSQ_ERR_SUCCESS;
235
0
}
236
237
238
int keepalive__update(struct mosquitto *context)
239
0
{
240
0
#ifndef WITH_OLD_KEEPALIVE
241
0
  keepalive__remove(context);
242
  /* coverity[missing_lock] - broker is single threaded, so no lock required */
243
0
  context->last_msg_in = db.now_s;
244
0
  keepalive__add(context);
245
#else
246
  UNUSED(context);
247
#endif
248
0
  return MOSQ_ERR_SUCCESS;
249
0
}